aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/cas.cpp13
-rw-r--r--src/zenstore/filecas.cpp49
-rw-r--r--src/zenstore/filecas.h12
3 files changed, 57 insertions, 17 deletions
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 45d7dd277..67790e2c6 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -430,11 +430,14 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
{
return false;
}
- if (!m_LargeStrategy.IterateChunks(DecompressedIds, [&](size_t Index, const IoBuffer& Payload) {
- IoBuffer Chunk(Payload);
- Chunk.SetContentType(ZenContentType::kCompressedBinary);
- return AsyncCallback(Index, Payload);
- }))
+ if (!m_LargeStrategy.IterateChunks(
+ DecompressedIds,
+ [&](size_t Index, const IoBuffer& Payload) {
+ IoBuffer Chunk(Payload);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+ return AsyncCallback(Index, Payload);
+ },
+ OptionalWorkerPool))
{
return false;
}
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index df039d4b6..1c6aa539a 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -16,6 +16,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zencore/uid.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/basicfile.h>
@@ -810,7 +811,9 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
}
bool
-FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback)
+FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
{
std::vector<size_t> FoundChunkIndexes;
{
@@ -823,19 +826,51 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::functio
}
}
}
- bool Continue = true;
- for (size_t ChunkIndex : FoundChunkIndexes)
+ std::atomic_bool Continue = true;
+ if (!FoundChunkIndexes.empty())
{
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
- if (Payload)
+ auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) {
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ if (Payload)
+ {
+ if (!AsyncCallback(ChunkIndex, std::move(Payload)))
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ Latch WorkLatch(1);
+ for (size_t ChunkIndex : FoundChunkIndexes)
{
- Continue = Callback(ChunkIndex, std::move(Payload));
if (!Continue)
{
break;
}
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (!Continue)
+ {
+ return;
+ }
+ if (!ProcessOne(ChunkIndex))
+ {
+ Continue = false;
+ }
+ });
+ }
+ else
+ {
+ Continue = Continue && ProcessOne(ChunkIndex);
+ }
}
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
return Continue;
}
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index 06e35de23..07fc36954 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -39,11 +39,13 @@ struct FileCasStrategy final : public GcStorage, public GcReferenceStore
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(HashKeySet& InOutChunks);
- bool IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback);
- void Flush();
- virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
- virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override;
+ bool IterateChunks(std::span<IoHash> ChunkHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool);
+ void Flush();
+ virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
+ virtual void CollectGarbage(GcContext& GcCtx) override;
+ virtual GcStorageSize StorageSize() const override;
virtual std::string GetGcName(GcCtx& Ctx) override;
virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override;