diff options
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/cas.cpp | 13 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 49 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 12 |
3 files changed, 57 insertions, 17 deletions
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 45d7dd277..67790e2c6 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -430,11 +430,14 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, { return false; } - if (!m_LargeStrategy.IterateChunks(DecompressedIds, [&](size_t Index, const IoBuffer& Payload) { - IoBuffer Chunk(Payload); - Chunk.SetContentType(ZenContentType::kCompressedBinary); - return AsyncCallback(Index, Payload); - })) + if (!m_LargeStrategy.IterateChunks( + DecompressedIds, + [&](size_t Index, const IoBuffer& Payload) { + IoBuffer Chunk(Payload); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return AsyncCallback(Index, Payload); + }, + OptionalWorkerPool)) { return false; } diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index df039d4b6..1c6aa539a 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -16,6 +16,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zencore/uid.h> +#include <zencore/workthreadpool.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> #include <zenutil/basicfile.h> @@ -810,7 +811,9 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) } bool -FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback) +FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) { std::vector<size_t> FoundChunkIndexes; { @@ -823,19 +826,51 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::functio } } } - bool Continue = true; - for (size_t ChunkIndex : FoundChunkIndexes) + std::atomic_bool Continue = true; + if (!FoundChunkIndexes.empty()) { - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); - if (Payload) + auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) { + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + if (Payload) + { + if (!AsyncCallback(ChunkIndex, std::move(Payload))) + { + return false; + } + } + return true; + }; + + Latch WorkLatch(1); + for (size_t ChunkIndex : FoundChunkIndexes) { - Continue = Callback(ChunkIndex, std::move(Payload)); if (!Continue) { break; } + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (!Continue) + { + return; + } + if (!ProcessOne(ChunkIndex)) + { + Continue = false; + } + }); + } + else + { + Continue = Continue && ProcessOne(ChunkIndex); + } } + WorkLatch.CountDown(); + WorkLatch.Wait(); } return Continue; } diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index 06e35de23..07fc36954 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -39,11 +39,13 @@ struct FileCasStrategy final : public GcStorage, public GcReferenceStore IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(HashKeySet& InOutChunks); - bool IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback); - void Flush(); - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override; + bool IterateChunks(std::span<IoHash> ChunkHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool); + void Flush(); + virtual void ScrubStorage(ScrubContext& ScrubCtx) override; + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override; virtual std::string GetGcName(GcCtx& Ctx) override; virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override; |