diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-30 10:11:25 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-30 10:11:25 +0200 |
| commit | c7a0ddf9f26fdd647574e4031a66362234298e7a (patch) | |
| tree | 5e3eba1856dc160ca0beb2bc5edaf1ea490a0a6f /src/zenstore/filecas.cpp | |
| parent | miscellaneous minor bugfixes (#66) (diff) | |
| download | zen-c7a0ddf9f26fdd647574e4031a66362234298e7a.tar.xz zen-c7a0ddf9f26fdd647574e4031a66362234298e7a.zip | |
fix get project files loop (#68)
- Bugfix: Remove extra loop causing GetProjectFiles for project store to find all chunks once for each chunk found
- Bugfix: Don't capture ChunkIndex variable in CasImpl::IterateChunks by reference as it causes crash
- Improvement: Make FileCasStrategy::IterateChunks (optionally) multithreaded (improves GetProjectFiles performance)
Diffstat (limited to 'src/zenstore/filecas.cpp')
| -rw-r--r-- | src/zenstore/filecas.cpp | 49 |
1 files changed, 42 insertions, 7 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index df039d4b6..1c6aa539a 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -16,6 +16,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zencore/uid.h> +#include <zencore/workthreadpool.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> #include <zenutil/basicfile.h> @@ -810,7 +811,9 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) } bool -FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback) +FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) { std::vector<size_t> FoundChunkIndexes; { @@ -823,19 +826,51 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::functio } } } - bool Continue = true; - for (size_t ChunkIndex : FoundChunkIndexes) + std::atomic_bool Continue = true; + if (!FoundChunkIndexes.empty()) { - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); - if (Payload) + auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) { + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + if (Payload) + { + if (!AsyncCallback(ChunkIndex, std::move(Payload))) + { + return false; + } + } + return true; + }; + + Latch WorkLatch(1); + for (size_t ChunkIndex : FoundChunkIndexes) { - Continue = Callback(ChunkIndex, std::move(Payload)); if (!Continue) { break; } + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (!Continue) + { + return; + } + if (!ProcessOne(ChunkIndex)) + { + Continue = false; + } + }); + } + else + { + Continue = Continue && ProcessOne(ChunkIndex); + } } + WorkLatch.CountDown(); + WorkLatch.Wait(); } return Continue; } |