aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/filecas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-30 10:11:25 +0200
committerGitHub Enterprise <[email protected]>2024-04-30 10:11:25 +0200
commitc7a0ddf9f26fdd647574e4031a66362234298e7a (patch)
tree5e3eba1856dc160ca0beb2bc5edaf1ea490a0a6f /src/zenstore/filecas.cpp
parentmiscellaneous minor bugfixes (#66) (diff)
downloadzen-c7a0ddf9f26fdd647574e4031a66362234298e7a.tar.xz
zen-c7a0ddf9f26fdd647574e4031a66362234298e7a.zip
fix get project files loop (#68)
- Bugfix: Remove extra loop causing GetProjectFiles for project store to find all chunks once for each chunk found - Bugfix: Don't capture ChunkIndex variable in CasImpl::IterateChunks by reference as it causes crash - Improvement: Make FileCasStrategy::IterateChunks (optionally) multithreaded (improves GetProjectFiles performance)
Diffstat (limited to 'src/zenstore/filecas.cpp')
-rw-r--r--src/zenstore/filecas.cpp49
1 files changed, 42 insertions, 7 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index df039d4b6..1c6aa539a 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -16,6 +16,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zencore/uid.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/basicfile.h>
@@ -810,7 +811,9 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
}
bool
-FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback)
+FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
{
std::vector<size_t> FoundChunkIndexes;
{
@@ -823,19 +826,51 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::functio
}
}
}
- bool Continue = true;
- for (size_t ChunkIndex : FoundChunkIndexes)
+ std::atomic_bool Continue = true;
+ if (!FoundChunkIndexes.empty())
{
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
- if (Payload)
+ auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) {
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ if (Payload)
+ {
+ if (!AsyncCallback(ChunkIndex, std::move(Payload)))
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ Latch WorkLatch(1);
+ for (size_t ChunkIndex : FoundChunkIndexes)
{
- Continue = Callback(ChunkIndex, std::move(Payload));
if (!Continue)
{
break;
}
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (!Continue)
+ {
+ return;
+ }
+ if (!ProcessOne(ChunkIndex))
+ {
+ Continue = false;
+ }
+ });
+ }
+ else
+ {
+ Continue = Continue && ProcessOne(ChunkIndex);
+ }
}
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
return Continue;
}