aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp108
-rw-r--r--src/zen/cmds/cache_cmd.cpp41
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp3
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp3
-rw-r--r--src/zen/cmds/wipe_cmd.cpp2
-rw-r--r--src/zencore/filesystem.cpp42
-rw-r--r--src/zencore/include/zencore/workthreadpool.h23
-rw-r--r--src/zencore/jobqueue.cpp68
-rw-r--r--src/zencore/workthreadpool.cpp151
-rw-r--r--src/zencore/zencore.cpp7
-rw-r--r--src/zenhttp/servers/httpsys.cpp2
-rw-r--r--src/zenhttp/transports/winsocktransport.cpp26
-rw-r--r--src/zenserver-test/zenserver-test.cpp95
-rw-r--r--src/zenserver/admin/admin.cpp2
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp2
-rw-r--r--src/zenserver/projectstore/projectstore.cpp29
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp1543
-rw-r--r--src/zenstore/blockstore.cpp212
-rw-r--r--src/zenstore/buildstore/buildstore.cpp2
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp7
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp104
-rw-r--r--src/zenstore/cas.cpp19
-rw-r--r--src/zenstore/compactcas.cpp218
-rw-r--r--src/zenstore/filecas.cpp2
-rw-r--r--src/zenstore/gc.cpp188
-rw-r--r--src/zenstore/workspaces.cpp24
-rw-r--r--src/zenutil/buildstoragecache.cpp28
-rw-r--r--src/zenutil/chunkedcontent.cpp2
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h39
-rw-r--r--src/zenutil/parallelwork.cpp31
30 files changed, 1602 insertions, 1421 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index d858fa328..1e097edb7 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -635,7 +635,7 @@ namespace {
std::atomic<uint64_t> DiscoveredItemCount = 0;
std::atomic<uint64_t> DeletedItemCount = 0;
std::atomic<uint64_t> DeletedByteCount = 0;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
@@ -2210,7 +2210,7 @@ namespace {
WorkerThreadPool& NetworkPool = GetNetworkPool();
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -2654,7 +2654,7 @@ namespace {
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
@@ -2888,7 +2888,7 @@ namespace {
FilteredRate FilteredCompressedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
std::atomic<size_t> UploadedBlockSize = 0;
std::atomic<size_t> UploadedBlockCount = 0;
@@ -3541,56 +3541,58 @@ namespace {
FindBlocksStatistics FindBlocksStats;
- std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask(std::packaged_task<PrepareBuildResult()>{
- [&Storage, BuildId, FindBlockMaxCount, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] {
- ZEN_TRACE_CPU("PrepareBuild");
+ std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask(
+ std::packaged_task<PrepareBuildResult()>{
+ [&Storage, BuildId, FindBlockMaxCount, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] {
+ ZEN_TRACE_CPU("PrepareBuild");
- PrepareBuildResult Result;
- Stopwatch Timer;
- if (CreateBuild)
- {
- ZEN_TRACE_CPU("CreateBuild");
-
- Stopwatch PutBuildTimer;
- CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData);
- Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
- Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
- Result.PayloadSize = MetaData.GetSize();
- }
- else
- {
- ZEN_TRACE_CPU("PutBuild");
- Stopwatch GetBuildTimer;
- CbObject Build = Storage.BuildStorage->GetBuild(BuildId);
- Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
- Result.PayloadSize = Build.GetSize();
- if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ PrepareBuildResult Result;
+ Stopwatch Timer;
+ if (CreateBuild)
{
- Result.PreferredMultipartChunkSize = ChunkSize;
+ ZEN_TRACE_CPU("CreateBuild");
+
+ Stopwatch PutBuildTimer;
+ CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData);
+ Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
+ Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
+ Result.PayloadSize = MetaData.GetSize();
}
- else if (AllowMultiparts)
+ else
{
- ZEN_CONSOLE_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
- NiceBytes(Result.PreferredMultipartChunkSize));
+ ZEN_TRACE_CPU("PutBuild");
+ Stopwatch GetBuildTimer;
+ CbObject Build = Storage.BuildStorage->GetBuild(BuildId);
+ Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
+ Result.PayloadSize = Build.GetSize();
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ Result.PreferredMultipartChunkSize = ChunkSize;
+ }
+ else if (AllowMultiparts)
+ {
+ ZEN_CONSOLE_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
+ NiceBytes(Result.PreferredMultipartChunkSize));
+ }
}
- }
- if (!IgnoreExistingBlocks)
- {
- ZEN_TRACE_CPU("FindBlocks");
- Stopwatch KnownBlocksTimer;
- CbObject BlockDescriptionList = Storage.BuildStorage->FindBlocks(BuildId, FindBlockMaxCount);
- if (BlockDescriptionList)
+ if (!IgnoreExistingBlocks)
{
- Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList);
+ ZEN_TRACE_CPU("FindBlocks");
+ Stopwatch KnownBlocksTimer;
+ CbObject BlockDescriptionList = Storage.BuildStorage->FindBlocks(BuildId, FindBlockMaxCount);
+ if (BlockDescriptionList)
+ {
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList);
+ }
+ FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
+ FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
+ Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
}
- FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
- FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
- Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
- }
- Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- return Result;
- }});
+ Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ return Result;
+ }},
+ WorkerThreadPool::EMode::EnableBacklog);
ChunkedFolderContent LocalContent;
@@ -4593,7 +4595,7 @@ namespace {
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size());
@@ -5694,7 +5696,7 @@ namespace {
Stopwatch Timer;
auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
std::atomic<uint64_t> CompletedPathCount = 0;
uint32_t PathIndex = 0;
@@ -6006,7 +6008,7 @@ namespace {
ScavengedPaths.resize(ScavengePathCount);
ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging");
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
std::atomic<uint64_t> PathsFound(0);
std::atomic<uint64_t> ChunksFound(0);
@@ -6474,7 +6476,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing");
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
struct LooseChunkHashWorkData
{
@@ -8124,7 +8126,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data");
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t LocalPathIndex : FilesToCache)
{
@@ -8211,7 +8213,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State");
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
OutLocalFolderState.Paths.resize(RemoteContent.Paths.size());
OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size());
@@ -11931,7 +11933,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return true;
};
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
uint32_t Randomizer = 0;
auto FileSizeIt = DownloadContent.FileSizes.begin();
diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp
index ea7ad79ee..aa23373c4 100644
--- a/src/zen/cmds/cache_cmd.cpp
+++ b/src/zen/cmds/cache_cmd.cpp
@@ -478,25 +478,28 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
std::span<uint64_t> BatchSizes = std::span<uint64_t>(Sizes).subspan(Offset, Min(Max(SizeCount, 1u), Sizes.size() - Offset));
WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&, BatchSizes, RequestIndex]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- CbPackage Package;
- if (m_MaxAttachmentCount > 0 && SizeCount > 0)
- {
- auto Request = GeneratePutCacheRecordRequest(BatchSizes, RequestIndex);
- ZEN_ASSERT(Request.Format(Package));
- }
- else
- {
- auto Request = GeneratePutCacheValueRequest(BatchSizes, RequestIndex);
- ZEN_ASSERT(Request.Format(Package));
- }
-
- if (HttpClient::Response Response = Http.Post("/z$/$rpc", Package, HttpClient::Accept(ZenContentType::kCbPackage)); !Response)
- {
- ZEN_CONSOLE("{}", Response.ErrorMessage(fmt::format("{}: ", RequestIndex)));
- }
- });
+ WorkerPool.ScheduleWork(
+ [&, BatchSizes, RequestIndex]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ CbPackage Package;
+ if (m_MaxAttachmentCount > 0 && SizeCount > 0)
+ {
+ auto Request = GeneratePutCacheRecordRequest(BatchSizes, RequestIndex);
+ ZEN_ASSERT(Request.Format(Package));
+ }
+ else
+ {
+ auto Request = GeneratePutCacheValueRequest(BatchSizes, RequestIndex);
+ ZEN_ASSERT(Request.Format(Package));
+ }
+
+ if (HttpClient::Response Response = Http.Post("/z$/$rpc", Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ !Response)
+ {
+ ZEN_CONSOLE("{}", Response.ErrorMessage(fmt::format("{}: ", RequestIndex)));
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
Offset += BatchSizes.size();
RequestIndex++;
}
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index b7c0e0b7f..8ed52c764 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -2081,7 +2081,8 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
ZEN_CONSOLE_ERROR("Failed writing file to '{}'. Reason: '{}'", TargetPath, Ex.what());
}
}
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
}
};
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index 4e672eea5..fd6c80e98 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -501,7 +501,8 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
EntryIndex = EntryOffset.fetch_add(m_Stride);
}
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (!WorkLatch.Wait(1000))
diff --git a/src/zen/cmds/wipe_cmd.cpp b/src/zen/cmds/wipe_cmd.cpp
index d2bc2f5c4..7424f2166 100644
--- a/src/zen/cmds/wipe_cmd.cpp
+++ b/src/zen/cmds/wipe_cmd.cpp
@@ -251,7 +251,7 @@ namespace {
return Added;
};
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 8327838c9..5125beeca 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -2439,26 +2439,28 @@ GetDirectoryContent(const std::filesystem::path& RootDir,
PendingWorkCount.AddCount(1);
try
{
- WorkerPool.ScheduleWork([WorkerPool = &WorkerPool,
- PendingWorkCount = &PendingWorkCount,
- Visitor = Visitor,
- Flags = Flags,
- Path = std::move(Path),
- RelativeRoot = RelativeRoot / DirectoryName]() {
- ZEN_ASSERT(Visitor);
- auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); });
- try
- {
- MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor);
- FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(Path, SubVisitor);
- Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content));
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what());
- }
- });
+ WorkerPool.ScheduleWork(
+ [WorkerPool = &WorkerPool,
+ PendingWorkCount = &PendingWorkCount,
+ Visitor = Visitor,
+ Flags = Flags,
+ Path = std::move(Path),
+ RelativeRoot = RelativeRoot / DirectoryName]() {
+ ZEN_ASSERT(Visitor);
+ auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); });
+ try
+ {
+ MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor);
+ FileSystemTraversal Traversal;
+ Traversal.TraverseFileSystem(Path, SubVisitor);
+ Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content));
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
catch (const std::exception Ex)
{
diff --git a/src/zencore/include/zencore/workthreadpool.h b/src/zencore/include/zencore/workthreadpool.h
index 62356495c..4c38dd651 100644
--- a/src/zencore/include/zencore/workthreadpool.h
+++ b/src/zencore/include/zencore/workthreadpool.h
@@ -18,11 +18,7 @@ struct IWork : public RefCounted
{
virtual void Execute() = 0;
- inline std::exception_ptr GetException() { return m_Exception; }
-
private:
- std::exception_ptr m_Exception;
-
friend class WorkerThreadPool;
};
@@ -35,13 +31,18 @@ public:
WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName);
~WorkerThreadPool();
- void ScheduleWork(Ref<IWork> Work);
- void ScheduleWork(std::function<void()>&& Work);
+ // Decides what to do if there are no free workers in the pool when the work is submitted
+ enum class EMode
+ {
+ EnableBacklog, // The work will be added to a backlog of work to do
+ DisableBacklog // The work will be executed synchronously in the caller thread
+ };
+
+ void ScheduleWork(Ref<IWork> Work, EMode Mode);
+ void ScheduleWork(std::function<void()>&& Work, EMode Mode);
template<typename Func>
- auto EnqueueTask(std::packaged_task<Func> Task);
-
- [[nodiscard]] size_t PendingWorkItemCount() const;
+ auto EnqueueTask(std::packaged_task<Func> Task, EMode Mode);
private:
struct Impl;
@@ -54,7 +55,7 @@ private:
template<typename Func>
auto
-WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task)
+WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task, EMode Mode)
{
struct FutureWork : IWork
{
@@ -67,7 +68,7 @@ WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task)
Ref<FutureWork> Work{new FutureWork(std::move(Task))};
auto Future = Work->m_Task.get_future();
- ScheduleWork(std::move(Work));
+ ScheduleWork(std::move(Work), Mode);
return Future;
}
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 5d727b69c..4aa8c113e 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -109,10 +109,12 @@ public:
WorkerCounter.AddCount(1);
try
{
- WorkerPool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
- Worker();
- });
+ WorkerPool.ScheduleWork(
+ [&]() {
+ auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
+ Worker();
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
return {.Id = NewJobId};
}
catch (const std::exception& Ex)
@@ -466,34 +468,36 @@ TEST_CASE("JobQueue")
for (uint32_t I = 0; I < 100; I++)
{
JobsLatch.AddCount(1);
- Pool.ScheduleWork([&Queue, &JobsLatch, I]() {
- auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- JobsLatch.AddCount(1);
- auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) {
- auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("going to sleep", "", 100, 100);
- Sleep(10);
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("going to sleep again", "", 100, 50);
- if ((I & 0xFF) == 0x10)
- {
- zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
- }
- Sleep(10);
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("done", "", 100, 0);
- });
- });
+ Pool.ScheduleWork(
+ [&Queue, &JobsLatch, I]() {
+ auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ JobsLatch.AddCount(1);
+ auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) {
+ auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("going to sleep", "", 100, 100);
+ Sleep(10);
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("going to sleep again", "", 100, 50);
+ if ((I & 0xFF) == 0x10)
+ {
+ zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
+ }
+ Sleep(10);
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("done", "", 100, 0);
+ });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string {
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index 445fe939e..e241c0de8 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -5,6 +5,7 @@
#include <zencore/blockingqueue.h>
#include <zencore/except.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/thread.h>
@@ -13,6 +14,10 @@
#include <thread>
#include <vector>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
#define ZEN_USE_WINDOWS_THREADPOOL 1
#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL
@@ -41,18 +46,23 @@ namespace {
struct WorkerThreadPool::Impl
{
+ const int m_ThreadCount = 0;
PTP_POOL m_ThreadPool = nullptr;
PTP_CLEANUP_GROUP m_CleanupGroup = nullptr;
TP_CALLBACK_ENVIRON m_CallbackEnvironment;
PTP_WORK m_Work = nullptr;
- std::string m_WorkerThreadBaseName;
- std::atomic<int> m_WorkerThreadCounter{0};
+ std::string m_WorkerThreadBaseName;
+ std::atomic<size_t> m_WorkerThreadCounter{0};
+ std::atomic<int> m_FreeWorkerCount{0};
- RwLock m_QueueLock;
+ mutable RwLock m_QueueLock;
std::deque<Ref<IWork>> m_WorkQueue;
- Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
+ Impl(int InThreadCount, std::string_view WorkerThreadBaseName)
+ : m_ThreadCount(InThreadCount)
+ , m_WorkerThreadBaseName(WorkerThreadBaseName)
+ , m_FreeWorkerCount(m_ThreadCount)
{
// Thread pool setup
@@ -62,11 +72,11 @@ struct WorkerThreadPool::Impl
ThrowLastError("CreateThreadpool failed");
}
- if (!SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount))
+ if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount))
{
ThrowLastError("SetThreadpoolThreadMinimum failed");
}
- SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2);
+ SetThreadpoolThreadMaximum(m_ThreadPool, (DWORD)m_ThreadCount);
InitializeThreadpoolEnvironment(&m_CallbackEnvironment);
@@ -93,12 +103,29 @@ struct WorkerThreadPool::Impl
CloseThreadpool(m_ThreadPool);
}
- void ScheduleWork(Ref<IWork> Work)
+ [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode)
{
- m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); });
+ if (Mode == WorkerThreadPool::EMode::DisableBacklog)
+ {
+ if (m_FreeWorkerCount <= 0)
+ {
+ return Work;
+ }
+ RwLock::ExclusiveLockScope _(m_QueueLock);
+ const int QueuedCount = gsl::narrow<int>(m_WorkQueue.size());
+ if (QueuedCount >= m_FreeWorkerCount)
+ {
+ return Work;
+ }
+ m_WorkQueue.push_back(std::move(Work));
+ }
+ else
+ {
+ m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); });
+ }
SubmitThreadpoolWork(m_Work);
+ return {};
}
- [[nodiscard]] size_t PendingWorkItemCount() const { return 0; }
static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work)
{
@@ -109,10 +136,13 @@ struct WorkerThreadPool::Impl
void DoWork()
{
+ m_FreeWorkerCount--;
+ auto _ = MakeGuard([&]() { m_FreeWorkerCount++; });
+
if (!t_IsThreadNamed)
{
t_IsThreadNamed = true;
- const int ThreadIndex = ++m_WorkerThreadCounter;
+ const size_t ThreadIndex = ++m_WorkerThreadCounter;
zen::ExtendableStringBuilder<128> ThreadName;
ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex;
SetCurrentThreadName(ThreadName);
@@ -121,7 +151,7 @@ struct WorkerThreadPool::Impl
Ref<IWork> WorkFromQueue;
{
- RwLock::ExclusiveLockScope _{m_QueueLock};
+ RwLock::ExclusiveLockScope __{m_QueueLock};
WorkFromQueue = std::move(m_WorkQueue.front());
m_WorkQueue.pop_front();
}
@@ -141,20 +171,25 @@ struct WorkerThreadPool::ThreadStartInfo
struct WorkerThreadPool::Impl
{
+ const int m_ThreadCount = 0;
void WorkerThreadFunction(ThreadStartInfo Info);
std::string m_WorkerThreadBaseName;
std::vector<std::thread> m_WorkerThreads;
BlockingQueue<Ref<IWork>> m_WorkQueue;
+ std::atomic<int> m_FreeWorkerCount{0};
- Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
+ Impl(int InThreadCount, std::string_view WorkerThreadBaseName)
+ : m_ThreadCount(InThreadCount)
+ , m_WorkerThreadBaseName(WorkerThreadBaseName)
+ , m_FreeWorkerCount(m_ThreadCount)
{
# if ZEN_WITH_TRACE
trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str());
# endif
- zen::Latch WorkerLatch{InThreadCount};
+ zen::Latch WorkerLatch{m_ThreadCount};
- for (int i = 0; i < InThreadCount; ++i)
+ for (int i = 0; i < m_ThreadCount; ++i)
{
m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch});
}
@@ -181,8 +216,23 @@ struct WorkerThreadPool::Impl
m_WorkerThreads.clear();
}
- void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); }
- [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); }
+ [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode)
+ {
+ if (Mode == WorkerThreadPool::EMode::DisableBacklog)
+ {
+ if (m_FreeWorkerCount <= 0)
+ {
+ return Work;
+ }
+ const int QueuedCount = gsl::narrow<int>(m_WorkQueue.Size());
+ if (QueuedCount >= m_FreeWorkerCount)
+ {
+ return Work;
+ }
+ }
+ m_WorkQueue.Enqueue(std::move(Work));
+ return {};
+ }
};
void
@@ -197,21 +247,23 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info)
Ref<IWork> Work;
if (m_WorkQueue.WaitAndDequeue(Work))
{
+ m_FreeWorkerCount--;
+ auto _ = MakeGuard([&]() { m_FreeWorkerCount++; });
+
try
{
ZEN_TRACE_CPU_FLUSH("AsyncWork");
Work->Execute();
+ Work = {};
}
catch (const AssertException& Ex)
{
- Work->m_Exception = std::current_exception();
-
+ Work = {};
ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription());
}
catch (const std::exception& e)
{
- Work->m_Exception = std::current_exception();
-
+ Work = {};
ZEN_WARN("Caught exception in worker thread: {}", e.what());
}
}
@@ -243,48 +295,38 @@ WorkerThreadPool::~WorkerThreadPool()
}
void
-WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
+WorkerThreadPool::ScheduleWork(Ref<IWork> Work, EMode Mode)
{
if (m_Impl)
{
- m_Impl->ScheduleWork(std::move(Work));
- }
- else
- {
- try
+ if (Work = m_Impl->ScheduleWork(std::move(Work), Mode); !Work)
{
- ZEN_TRACE_CPU_FLUSH("SyncWork");
- Work->Execute();
- }
- catch (const AssertException& Ex)
- {
- Work->m_Exception = std::current_exception();
-
- ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription());
+ return;
}
- catch (const std::exception& e)
- {
- Work->m_Exception = std::current_exception();
+ }
- ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what());
- }
+ try
+ {
+ ZEN_TRACE_CPU_FLUSH("SyncWork");
+ Work->Execute();
+ Work = {};
+ }
+ catch (const AssertException& Ex)
+ {
+ Work = {};
+ ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription());
+ }
+ catch (const std::exception& e)
+ {
+ Work = {};
+ ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what());
}
}
void
-WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
-{
- ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work))));
-}
-
-[[nodiscard]] size_t
-WorkerThreadPool::PendingWorkItemCount() const
+WorkerThreadPool::ScheduleWork(std::function<void()>&& Work, EMode Mode)
{
- if (m_Impl)
- {
- return m_Impl->PendingWorkItemCount();
- }
- return 0;
+ ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work))), Mode);
}
//////////////////////////////////////////////////////////////////////////
@@ -302,9 +344,10 @@ TEST_CASE("threadpool.basic")
{
WorkerThreadPool Threadpool{1};
- auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }});
- auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }});
- auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }});
+ auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}, WorkerThreadPool::EMode::EnableBacklog);
+ auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}, WorkerThreadPool::EMode::EnableBacklog);
+ auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }},
+ WorkerThreadPool::EMode::EnableBacklog);
CHECK_EQ(Future42.get(), 42);
CHECK_EQ(Future99.get(), 99);
diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp
index 51e06ae14..5a6232318 100644
--- a/src/zencore/zencore.cpp
+++ b/src/zencore/zencore.cpp
@@ -350,9 +350,10 @@ TEST_CASE("Assert.Callstack")
WorkerThreadPool Pool(1);
auto Task = Pool.EnqueueTask(std::packaged_task<int()>{[] {
- ZEN_ASSERT(false);
- return 1;
- }});
+ ZEN_ASSERT(false);
+ return 1;
+ }},
+ WorkerThreadPool::EMode::EnableBacklog);
try
{
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index e57fa8a30..9bbfef255 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -835,7 +835,7 @@ HttpAsyncWorkRequest::IssueRequest(std::error_code& ErrorCode)
ZEN_TRACE_CPU("httpsys::AsyncWork::IssueRequest");
ErrorCode.clear();
- Transaction().Server().WorkPool().ScheduleWork(m_WorkItem);
+ Transaction().Server().WorkPool().ScheduleWork(m_WorkItem, WorkerThreadPool::EMode::EnableBacklog);
}
HttpSysRequestHandler*
diff --git a/src/zenhttp/transports/winsocktransport.cpp b/src/zenhttp/transports/winsocktransport.cpp
index 8c82760bb..c06a50c95 100644
--- a/src/zenhttp/transports/winsocktransport.cpp
+++ b/src/zenhttp/transports/winsocktransport.cpp
@@ -304,18 +304,20 @@ SocketTransportPluginImpl::Initialize(TransportServer* ServerInterface)
TransportServerConnection* ConnectionInterface{m_ServerInterface->CreateConnectionHandler(Connection)};
Connection->Initialize(ConnectionInterface, ClientSocket);
- m_WorkerThreadpool->ScheduleWork([Connection] {
- try
- {
- Connection->HandleConnection();
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("exception caught in connection loop: {}", Ex.what());
- }
-
- delete Connection;
- });
+ m_WorkerThreadpool->ScheduleWork(
+ [Connection] {
+ try
+ {
+ Connection->HandleConnection();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("exception caught in connection loop: {}", Ex.what());
+ }
+
+ delete Connection;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
else
{
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 77ed87cb1..79e5db554 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -2142,20 +2142,23 @@ TEST_CASE("zcache.failing.upstream")
for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
{
size_t Iteration = I;
- Pool.ScheduleWork([&] {
- std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest);
- if (NewKeys.size() != RecordsPerRequest)
- {
- ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration);
+ Pool.ScheduleWork(
+ [&] {
+ std::vector<CacheKey> NewKeys =
+ PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest);
+ if (NewKeys.size() != RecordsPerRequest)
+ {
+ ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration);
+ Completed.fetch_add(1);
+ return;
+ }
+ {
+ RwLock::ExclusiveLockScope _(KeysLock);
+ Keys[Iteration].swap(NewKeys);
+ }
Completed.fetch_add(1);
- return;
- }
- {
- RwLock::ExclusiveLockScope _(KeysLock);
- Keys[Iteration].swap(NewKeys);
- }
- Completed.fetch_add(1);
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
bool UseUpstream1 = false;
while (Completed < ThreadCount * KeyMultiplier)
@@ -2206,48 +2209,50 @@ TEST_CASE("zcache.failing.upstream")
Completed.fetch_add(1);
continue;
}
- Pool.ScheduleWork([&] {
- GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy);
+ Pool.ScheduleWork(
+ [&] {
+ GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy);
- if (!Result.Success)
- {
- ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration);
- Completed.fetch_add(1);
- return;
- }
-
- if (Result.Result.Results.size() != LocalKeys.size())
- {
- ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration);
- Completed.fetch_add(1);
- return;
- }
- for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
- {
- const CacheKey& ExpectedKey = LocalKeys[Index++];
- if (!Record)
- {
- continue;
- }
- if (Record->Key != ExpectedKey)
+ if (!Result.Success)
{
- continue;
+ ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration);
+ Completed.fetch_add(1);
+ return;
}
- if (Record->Values.size() != 1)
+
+ if (Result.Result.Results.size() != LocalKeys.size())
{
- continue;
+ ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration);
+ Completed.fetch_add(1);
+ return;
}
-
- for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
{
- if (!Value.Body)
+ const CacheKey& ExpectedKey = LocalKeys[Index++];
+ if (!Record)
{
continue;
}
+ if (Record->Key != ExpectedKey)
+ {
+ continue;
+ }
+ if (Record->Values.size() != 1)
+ {
+ continue;
+ }
+
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ if (!Value.Body)
+ {
+ continue;
+ }
+ }
}
- }
- Completed.fetch_add(1);
- });
+ Completed.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (Completed < ThreadCount * KeyMultiplier)
{
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index f6ad14422..97522e892 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -73,7 +73,7 @@ GetStatsForDirectory(std::filesystem::path Dir)
GetDirectoryContent(Dir,
DirectoryContentFlags::IncludeAllEntries | DirectoryContentFlags::IncludeFileSizes,
DirTraverser,
- GetSmallWorkerPool(EWorkloadType::Burst),
+ GetSmallWorkerPool(EWorkloadType::Background),
PendingWorkCount);
PendingWorkCount.CountDown();
PendingWorkCount.Wait();
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 08d0b12a7..c83065506 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -1646,7 +1646,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
ZEN_INFO("Replaying {} requests", RequestCount);
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 322af5e69..7cb115110 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1704,7 +1704,7 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir,
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
@@ -2249,7 +2249,7 @@ ProjectStore::Oplog::IterateChunks(const std::filesystem::path& P
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
@@ -4122,21 +4122,24 @@ ProjectStore::Flush()
WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (const Ref<Project>& Project : Projects)
{
- Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) {
- try
- {
- Project->Flush();
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
- }
- });
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Project](std::atomic<bool>&) {
+ try
+ {
+ Project->Flush();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
+ }
+ },
+ 0);
}
}
catch (const std::exception& Ex)
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 2a81ee3e3..feafcc810 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -245,101 +245,105 @@ namespace remotestore_impl {
const std::vector<IoHash>& Chunks)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &WorkerPool,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &RemoteResult,
- Chunks = Chunks,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
- if (Result.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to load attachments with {} chunks ({}): {}",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (IgnoreMissingAttachments)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- }
- return;
- }
- Info.AttachmentsDownloaded.fetch_add(Chunks.size());
- ZEN_INFO("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ NetworkWorkerPool.ScheduleWork(
+ [&RemoteStore,
+ &ChunkStore,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &RemoteResult,
+ Chunks = Chunks,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
}
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
{
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to load attachments with {} chunks ({}): {}",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ }
return;
}
- if (!Chunks.empty())
+ Info.AttachmentsDownloaded.fetch_add(Chunks.size());
+ ZEN_INFO("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ if (RemoteResult.IsError())
{
- try
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
- WriteAttachmentBuffers.reserve(Chunks.size());
- WriteRawHashes.reserve(Chunks.size());
-
- for (const auto& It : Chunks)
+ return;
+ }
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(It.first);
+ return;
}
- std::vector<CidStore::InsertResult> InsertResults =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
-
- for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ if (!Chunks.empty())
{
- if (InsertResults[Index].New)
+ try
{
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
+
+ for (const auto& It : Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to bulk save {} attachments", Chunks.size()),
+ Ex.what());
}
}
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk save {} attachments", Chunks.size()),
- Ex.what());
- }
- }
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk load {} attachments", Chunks.size()),
- Ex.what());
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to bulk load {} attachments", Chunks.size()),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
};
void DownloadAndSaveBlock(CidStore& ChunkStore,
@@ -359,226 +363,237 @@ namespace remotestore_impl {
uint32_t RetriesLeft)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- BlockHash,
- &RemoteResult,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
- RetriesLeft,
- Chunks = Chunks]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
- if (BlockResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- }
- return;
- }
+ NetworkWorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ BlockHash,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ Chunks = Chunks]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
}
- uint64_t BlockSize = BlockResult.Bytes.GetSize();
- Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
- Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- BlockHash,
- &RemoteResult,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
- RetriesLeft,
- Chunks = Chunks,
- Bytes = std::move(BlockResult.Bytes)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
{
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download block attachment {} ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
return;
}
- try
+ if (RemoteResult.IsError())
{
- ZEN_ASSERT(Bytes.Size() > 0);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
-
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize);
- if (!Compressed)
- {
- if (RetriesLeft > 0)
+ return;
+ }
+ uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ Info.AttachmentBlocksDownloaded.fetch_add(1);
+ ZEN_INFO("Loaded block attachment '{}' in {} ({})",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlockSize));
+ Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ BlockHash,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ Chunks = Chunks,
+ Bytes = std::move(BlockResult.Bytes)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- ReportMessage(
- OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- std::move(Chunks),
- RetriesLeft - 1);
+ return;
}
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
- {});
- return;
- }
- SharedBuffer BlockPayload = Compressed.Decompress();
- if (!BlockPayload)
- {
- if (RetriesLeft > 0)
+ try
{
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- std::move(Chunks),
- RetriesLeft - 1);
- }
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash),
- {});
- return;
- }
- if (RawHash != BlockHash)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
- {});
- return;
- }
+ ZEN_ASSERT(Bytes.Size() > 0);
+ std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
+ WantedChunks.reserve(Chunks.size());
+ WantedChunks.insert(Chunks.begin(), Chunks.end());
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize);
+ if (!Compressed)
+ {
+ if (RetriesLeft > 0)
+ {
+ ReportMessage(
+ OptionalContext,
+ fmt::format(
+ "Block attachment {} is malformed, can't parse as compressed binary, retrying download",
+ BlockHash));
+ return DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockHash,
+ std::move(Chunks),
+ RetriesLeft - 1);
+ }
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
+ {});
+ return;
+ }
+ SharedBuffer BlockPayload = Compressed.Decompress();
+ if (!BlockPayload)
+ {
+ if (RetriesLeft > 0)
+ {
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download",
+ BlockHash));
+ return DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockHash,
+ std::move(Chunks),
+ RetriesLeft - 1);
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash));
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash),
+ {});
+ return;
+ }
+ if (RawHash != BlockHash)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
+ {});
+ return;
+ }
- uint64_t BlockHeaderSize = 0;
- bool StoreChunksOK = IterateChunkBlock(
- BlockPayload,
- [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
- const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- WantedChunks.erase(AttachmentRawHash);
- }
- },
- BlockHeaderSize);
-
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
+ uint64_t BlockHeaderSize = 0;
+ bool StoreChunksOK = IterateChunkBlock(
+ BlockPayload,
+ [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
+ const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
+ RawHash,
+ RawSize));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ },
+ BlockHeaderSize);
+
+ if (!StoreChunksOK)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has invalid format ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ return;
+ }
- ZEN_ASSERT(WantedChunks.empty());
+ ZEN_ASSERT(WantedChunks.empty());
- if (!WriteAttachmentBuffers.empty())
- {
- auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (size_t Index = 0; Index < Results.size(); Index++)
- {
- const auto& Result = Results[Index];
- if (Result.New)
+ if (!WriteAttachmentBuffers.empty())
{
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
}
}
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed save block attachment {}", BlockHash),
- Ex.what());
- }
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to block attachment {}", BlockHash),
- Ex.what());
- }
- });
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed save block attachment {}", BlockHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to block attachment {}", BlockHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
};
void DownloadAndSaveAttachment(CidStore& ChunkStore,
@@ -596,92 +611,96 @@ namespace remotestore_impl {
const IoHash& RawHash)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &WorkerPool,
- &RemoteResult,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- RawHash,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download large attachment {}: '{}', error code : {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
- }
- return;
- }
- uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
- Info.AttachmentsDownloaded.fetch_add(1);
+ NetworkWorkerPool.ScheduleWork(
+ [&RemoteStore,
+ &ChunkStore,
+ &WorkerPool,
+ &RemoteResult,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ RawHash,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
}
- Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- RawHash,
- AttachmentSize,
- Bytes = std::move(AttachmentResult.Bytes),
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
{
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
- if (InsertResult.New)
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download large attachment {}: '{}', error code : {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
{
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
}
+ return;
}
- catch (const std::exception& Ex)
+ uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
+ ZEN_INFO("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
+ Info.AttachmentsDownloaded.fetch_add(1);
+ if (RemoteResult.IsError())
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Saving attachment {} failed", RawHash),
- Ex.what());
+ return;
}
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Loading attachment {} failed", RawHash),
- Ex.what());
- }
- });
+ Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ RawHash,
+ AttachmentSize,
+ Bytes = std::move(AttachmentResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Saving attachment {} failed", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Loading attachment {} failed", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
};
void CreateBlock(WorkerThreadPool& WorkerPool,
@@ -694,45 +713,47 @@ namespace remotestore_impl {
AsyncRemoteResult& RemoteResult)
{
OpSectionsLatch.AddCount(1);
- WorkerPool.ScheduleWork([&Blocks,
- &SectionsLock,
- &OpSectionsLatch,
- BlockIndex,
- Chunks = std::move(ChunksInBlock),
- &AsyncOnBlock,
- &RemoteResult]() mutable {
- auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- size_t ChunkCount = Chunks.size();
- try
- {
- ZEN_ASSERT(ChunkCount > 0);
- Stopwatch Timer;
- ChunkBlockDescription Block;
- CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block);
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ WorkerPool.ScheduleWork(
+ [&Blocks,
+ &SectionsLock,
+ &OpSectionsLatch,
+ BlockIndex,
+ Chunks = std::move(ChunksInBlock),
+ &AsyncOnBlock,
+ &RemoteResult]() mutable {
+ auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex] = Block;
+ return;
}
- uint64_t BlockSize = CompressedBlock.GetCompressedSize();
- AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
- ZEN_INFO("Generated block with {} attachments in {} ({})",
- ChunkCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- NiceBytes(BlockSize));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount),
- Ex.what());
- }
- });
+ size_t ChunkCount = Chunks.size();
+ try
+ {
+ ZEN_ASSERT(ChunkCount > 0);
+ Stopwatch Timer;
+ ChunkBlockDescription Block;
+ CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block);
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex] = Block;
+ }
+ uint64_t BlockSize = CompressedBlock.GetCompressedSize();
+ AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
+ ZEN_INFO("Generated block with {} attachments in {} ({})",
+ ChunkCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ NiceBytes(BlockSize));
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
struct UploadInfo
@@ -861,89 +882,91 @@ namespace remotestore_impl {
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork([&ChunkStore,
- &RemoteStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- RawHash,
- &CreatedBlocks,
- &LooseFileAttachments,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- IoBuffer Payload;
- ChunkBlockDescription Block;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = BlockIt->second.Payload;
- Block = BlockIt->second.Block;
- }
- else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
- {
- Payload = LooseTmpFileIt->second(RawHash);
- }
- else
- {
- Payload = ChunkStore.FindChunkByCid(RawHash);
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_WARN("Failed to save attachment '{}' ({}): {}",
- RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- const bool IsBlock = Block.BlockHash == RawHash;
- size_t PayloadSize = Payload.GetSize();
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block));
- if (Result.ErrorCode)
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ &RemoteStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ RawHash,
+ &CreatedBlocks,
+ &LooseFileAttachments,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(PayloadSize),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
return;
}
- if (IsBlock)
+ try
{
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
+ IoBuffer Payload;
+ ChunkBlockDescription Block;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = BlockIt->second.Payload;
+ Block = BlockIt->second.Block;
+ }
+ else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second(RawHash);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_WARN("Failed to save attachment '{}' ({}): {}",
+ RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ const bool IsBlock = Block.BlockHash == RawHash;
+ size_t PayloadSize = Payload.GetSize();
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block));
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
+ RawHash,
+ NiceBytes(PayloadSize),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
+ }
+ if (IsBlock)
+ {
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved block attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
+ else
+ {
+ Info.AttachmentsUploaded.fetch_add(1);
+ Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
}
- else
+ catch (const std::exception& Ex)
{
- Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("To upload attachment {}", RawHash),
+ Ex.what());
}
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("To upload attachment {}", RawHash),
- Ex.what());
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
if (IsCancelled(OptionalContext))
@@ -982,66 +1005,68 @@ namespace remotestore_impl {
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- NeededChunks = std::move(NeededChunks),
- &BulkBlockAttachmentsToUpload,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- size_t ChunksSize = 0;
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
+ WorkerPool.ScheduleWork(
+ [&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ NeededChunks = std::move(NeededChunks),
+ &BulkBlockAttachmentsToUpload,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- auto It = BulkBlockAttachmentsToUpload.find(Chunk);
- ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompressedBuffer ChunkPayload = It->second(It->first).second;
- if (!ChunkPayload)
+ return;
+ }
+ try
+ {
+ size_t ChunksSize = 0;
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ auto It = BulkBlockAttachmentsToUpload.find(Chunk);
+ ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
+ CompressedBuffer ChunkPayload = It->second(It->first).second;
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunksSize += ChunkPayload.GetCompressedSize();
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer()));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
{
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachments with {} chunks ({}): {}",
+ NeededChunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
}
- ChunksSize += ChunkPayload.GetCompressedSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer()));
+ Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
+ Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
+
+ ZEN_INFO("Saved {} bulk attachments in {} ({})",
+ NeededChunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(ChunksSize));
}
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
+ catch (const std::exception& Ex)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): {}",
- NeededChunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to buck upload {} attachments", NeededChunks.size()),
+ Ex.what());
}
- Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
- Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
-
- ZEN_INFO("Saved {} bulk attachments in {} ({})",
- NeededChunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(ChunksSize));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to buck upload {} attachments", NeededChunks.size()),
- Ex.what());
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
}
@@ -1516,148 +1541,152 @@ BuildContainer(CidStore& ChunkStore,
ResolveAttachmentsLatch.AddCount(1);
- WorkerPool.ScheduleWork([&ChunkStore,
- UploadAttachment = &It.second,
- RawHash = It.first,
- &ResolveAttachmentsLatch,
- &ResolveLock,
- &ChunkedHashes,
- &LargeChunkHashes,
- &ChunkedUploadAttachments,
- &LooseUploadAttachments,
- &MissingHashes,
- &OnLargeAttachment,
- &AttachmentTempPath,
- &ChunkFile,
- &ChunkedFiles,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- AllowChunking,
- &RemoteResult,
- OptionalContext]() {
- auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return;
- }
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ UploadAttachment = &It.second,
+ RawHash = It.first,
+ &ResolveAttachmentsLatch,
+ &ResolveLock,
+ &ChunkedHashes,
+ &LargeChunkHashes,
+ &ChunkedUploadAttachments,
+ &LooseUploadAttachments,
+ &MissingHashes,
+ &OnLargeAttachment,
+ &AttachmentTempPath,
+ &ChunkFile,
+ &ChunkedFiles,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ AllowChunking,
+ &RemoteResult,
+ OptionalContext]() {
+ auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return;
+ }
- try
- {
- if (!UploadAttachment->RawPath.empty())
+ try
{
- const std::filesystem::path& FilePath = UploadAttachment->RawPath;
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
- if (RawData)
+ if (!UploadAttachment->RawPath.empty())
{
- if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit)
+ const std::filesystem::path& FilePath = UploadAttachment->RawPath;
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
+ if (RawData)
{
- IoBufferFileReference FileRef;
- (void)RawData.GetFileReference(FileRef);
-
- ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext);
- ResolveLock.WithExclusiveLock(
- [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
- ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
- ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- ChunkedHashes.insert(ChunkHash);
- }
- ChunkedFiles.emplace_back(std::move(Chunked));
+ if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit)
+ {
+ IoBufferFileReference FileRef;
+ (void)RawData.GetFileReference(FileRef);
+
+ ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext);
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
+ ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
+ ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ ChunkedHashes.insert(ChunkHash);
+ }
+ ChunkedFiles.emplace_back(std::move(Chunked));
+ });
+ }
+ else if (RawData.GetSize() > (MaxChunkEmbedSize * 2))
+ {
+ // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
+ // it will be a loose attachment instead of going into a block
+ OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) {
+ size_t RawSize = RawData.GetSize();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(RawHash.ToHexString());
+ IoBuffer TempAttachmentBuffer =
+ WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ AttachmentPath,
+ NiceBytes(RawSize),
+ NiceBytes(TempAttachmentBuffer.GetSize()));
+ return TempAttachmentBuffer;
});
- }
- else if (RawData.GetSize() > (MaxChunkEmbedSize * 2))
- {
- // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
- // it will be a loose attachment instead of going into a block
- OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) {
- size_t RawSize = RawData.GetSize();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)),
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ uint64_t RawSize = RawData.GetSize();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
OodleCompressor::Mermaid,
OodleCompressionLevel::VeryFast);
std::filesystem::path AttachmentPath = AttachmentTempPath;
AttachmentPath.append(RawHash.ToHexString());
+
+ uint64_t CompressedSize = Compressed.GetCompressedSize();
IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
AttachmentPath,
NiceBytes(RawSize),
NiceBytes(TempAttachmentBuffer.GetSize()));
- return TempAttachmentBuffer;
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+
+ if (CompressedSize > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash,
+ [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ UploadAttachment->Size = CompressedSize;
+ ResolveLock.WithExclusiveLock(
+ [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
+ LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
+ });
+ }
+ }
}
else
{
- uint64_t RawSize = RawData.GetSize();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
-
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(RawHash.ToHexString());
-
- uint64_t CompressedSize = Compressed.GetCompressedSize();
- IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
- ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- AttachmentPath,
- NiceBytes(RawSize),
- NiceBytes(TempAttachmentBuffer.GetSize()));
-
- if (CompressedSize > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- UploadAttachment->Size = CompressedSize;
- ResolveLock.WithExclusiveLock(
- [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
- LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
- });
- }
+ ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
}
}
else
{
- ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
- }
- else
- {
- IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
- if (Data)
- {
- auto GetForChunking =
- [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool {
- if (Data.IsWholeFile())
- {
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
- if (Compressed)
+ IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
+ if (Data)
+ {
+ auto GetForChunking =
+ [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool {
+ if (Data.IsWholeFile())
{
- if (VerifyRawSize > ChunkFileSizeLimit)
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
+ if (Compressed)
{
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ if (VerifyRawSize > ChunkFileSizeLimit)
{
- if (CompressionLevel == OodleCompressionLevel::None)
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
{
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- if (Decompressed)
+ if (CompressionLevel == OodleCompressionLevel::None)
{
- std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
- if (Segments.size() == 1)
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (Decompressed)
{
- IoBuffer DecompressedData = Segments[0].AsIoBuffer();
- if (DecompressedData.GetFileReference(OutFileRef))
+ std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
+ if (Segments.size() == 1)
{
- return true;
+ IoBuffer DecompressedData = Segments[0].AsIoBuffer();
+ if (DecompressedData.GetFileReference(OutFileRef))
+ {
+ return true;
+ }
}
}
}
@@ -1665,49 +1694,49 @@ BuildContainer(CidStore& ChunkStore,
}
}
}
- }
- return false;
- };
+ return false;
+ };
- IoBufferFileReference FileRef;
- if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef))
- {
- ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext);
- ResolveLock.WithExclusiveLock(
- [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
- ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
- ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- ChunkedHashes.insert(ChunkHash);
- }
- ChunkedFiles.emplace_back(std::move(Chunked));
- });
- }
- else if (Data.GetSize() > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash,
- [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ IoBufferFileReference FileRef;
+ if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef))
+ {
+ ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext);
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
+ ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
+ ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ ChunkedHashes.insert(ChunkHash);
+ }
+ ChunkedFiles.emplace_back(std::move(Chunked));
+ });
+ }
+ else if (Data.GetSize() > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash,
+ [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ UploadAttachment->Size = Data.GetSize();
+ }
}
else
{
- UploadAttachment->Size = Data.GetSize();
+ ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
}
}
- else
- {
- ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
}
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to resolve attachment {}", RawHash),
- Ex.what());
- }
- });
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to resolve attachment {}", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
ResolveAttachmentsLatch.CountDown();
@@ -3077,101 +3106,103 @@ LoadOplog(CidStore& ChunkStore,
{
std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
DechunkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&ChunkStore,
- &DechunkLatch,
- TempFileName,
- &Chunked,
- &RemoteResult,
- IgnoreMissingAttachments,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
- std::error_code Ec;
- if (IsFile(TempFileName, Ec))
- {
- RemoveFile(TempFileName, Ec);
- if (Ec)
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ &DechunkLatch,
+ TempFileName,
+ &Chunked,
+ &RemoteResult,
+ IgnoreMissingAttachments,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
+ std::error_code Ec;
+ if (IsFile(TempFileName, Ec))
{
- ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message());
+ RemoveFile(TempFileName, Ec);
+ if (Ec)
+ {
+ ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message());
+ }
}
- }
- DechunkLatch.CountDown();
- });
- try
- {
- if (RemoteResult.IsError())
- {
- return;
- }
- Stopwatch Timer;
- IoBuffer TmpBuffer;
+ DechunkLatch.CountDown();
+ });
+ try
{
- BasicFile TmpFile;
- TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
+ if (RemoteResult.IsError())
{
- BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
-
- uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
- BLAKE3Stream HashingStream;
- for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
+ return;
+ }
+ Stopwatch Timer;
+ IoBuffer TmpBuffer;
+ {
+ BasicFile TmpFile;
+ TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
{
- const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
- IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
- if (!Chunk)
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
- // We only add 1 as the resulting missing count will be 1 for the dechunked file
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
+ BLAKE3Stream HashingStream;
+ for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
+ {
+ const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
+ if (!Chunk)
{
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
- "Missing chunk",
+ remotestore_impl::ReportMessage(
+ OptionalContext,
fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ }
+ return;
+ }
+ CompositeBuffer Decompressed =
+ CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
+ for (const SharedBuffer& Segment : Decompressed.GetSegments())
+ {
+ MemoryView SegmentData = Segment.GetView();
+ HashingStream.Append(SegmentData);
+ TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
+ Offset += SegmentData.GetSize();
}
- return;
- }
- CompositeBuffer Decompressed =
- CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
- for (const SharedBuffer& Segment : Decompressed.GetSegments())
- {
- MemoryView SegmentData = Segment.GetView();
- HashingStream.Append(SegmentData);
- TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
- Offset += SegmentData.GetSize();
}
+ BLAKE3 RawHash = HashingStream.GetHash();
+ ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
+ UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
+ TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
}
- BLAKE3 RawHash = HashingStream.GetHash();
- ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
- UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
- TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
+ TmpFile.Close();
+ TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
}
- TmpFile.Close();
- TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+
+ ZEN_INFO("Dechunked attachment {} ({}) in {}",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
- if (InsertResult.New)
+ catch (const std::exception& Ex)
{
- Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to dechunck file {}", Chunked.RawHash),
+ Ex.what());
}
-
- ZEN_INFO("Dechunked attachment {} ({}) in {}",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to dechunck file {}", Chunked.RawHash),
- Ex.what());
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
DechunkLatch.CountDown();
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 7b56c64bd..c50f2bb13 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1686,80 +1686,82 @@ TEST_CASE("blockstore.iterate.chunks")
Latch WorkLatch(1);
Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- bool Continue = Store.IterateBlock(
- Locations,
- ChunkIndexes,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
- switch (ChunkIndex)
- {
- case 0:
- CHECK(Data);
- CHECK(Size == FirstChunkData.size());
- CHECK(std::string((const char*)Data, Size) == FirstChunkData);
- break;
- case 1:
- CHECK(Data);
- CHECK(Size == SecondChunkData.size());
- CHECK(std::string((const char*)Data, Size) == SecondChunkData);
- break;
- case 2:
- CHECK(false);
- break;
- case 3:
- CHECK(!Data);
- break;
- case 4:
- CHECK(!Data);
- break;
- case 5:
- CHECK(!Data);
- break;
- default:
- CHECK(false);
- break;
- }
- return true;
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
- switch (ChunkIndex)
- {
- case 0:
- case 1:
- CHECK(false);
- break;
- case 2:
- {
- CHECK(Size == VeryLargeChunk.size());
- char* Buffer = new char[Size];
- size_t HashOffset = 0;
- File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
- memcpy(&Buffer[HashOffset], Data, Size);
- HashOffset += Size;
- });
- CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
- delete[] Buffer;
- }
- break;
- case 3:
- CHECK(false);
- break;
- case 4:
- CHECK(false);
- break;
- case 5:
- CHECK(false);
- break;
- default:
- CHECK(false);
- break;
- }
- return true;
- },
- 0);
- CHECK(Continue);
- });
+ WorkerPool.ScheduleWork(
+ [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ bool Continue = Store.IterateBlock(
+ Locations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
+ switch (ChunkIndex)
+ {
+ case 0:
+ CHECK(Data);
+ CHECK(Size == FirstChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == FirstChunkData);
+ break;
+ case 1:
+ CHECK(Data);
+ CHECK(Size == SecondChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == SecondChunkData);
+ break;
+ case 2:
+ CHECK(false);
+ break;
+ case 3:
+ CHECK(!Data);
+ break;
+ case 4:
+ CHECK(!Data);
+ break;
+ case 5:
+ CHECK(!Data);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ return true;
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
+ switch (ChunkIndex)
+ {
+ case 0:
+ case 1:
+ CHECK(false);
+ break;
+ case 2:
+ {
+ CHECK(Size == VeryLargeChunk.size());
+ char* Buffer = new char[Size];
+ size_t HashOffset = 0;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
+ memcpy(&Buffer[HashOffset], Data, Size);
+ HashOffset += Size;
+ });
+ CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
+ delete[] Buffer;
+ }
+ break;
+ case 3:
+ CHECK(false);
+ break;
+ case 4:
+ CHECK(false);
+ break;
+ case 5:
+ CHECK(false);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ return true;
+ },
+ 0);
+ CHECK(Continue);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
return true;
});
WorkLatch.CountDown();
@@ -1796,11 +1798,15 @@ TEST_CASE("blockstore.thread.read.write")
std::atomic<size_t> WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ ChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1810,13 +1816,15 @@ TEST_CASE("blockstore.thread.read.write")
WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1828,20 +1836,24 @@ TEST_CASE("blockstore.thread.read.write")
WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
- SecondChunkLocations[ChunkIndex] = L;
- });
- WorkCompleted.fetch_add(1);
- });
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ SecondChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size() * 2)
{
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index d65c2bf06..4539746ba 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -376,7 +376,7 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
for (size_t Index = 0; Index < Metadatas.size(); Index++)
{
Work.ScheduleWork(
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index cacbbd966..fd52cdab5 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -4215,7 +4215,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (auto& BucketPath : FoundBucketDirectories)
@@ -4387,7 +4387,7 @@ ZenCacheDiskLayer::Flush()
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (auto& Bucket : Buckets)
@@ -4434,7 +4434,8 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
{
# if 1
Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
+ std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }},
+ WorkerThreadPool::EMode::EnableBacklog));
# else
CacheBucket& Bucket = *Kv.second;
Bucket.ScrubStorage(Ctx);
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 1f2d6c37f..3f27e6d21 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1574,10 +1574,12 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1612,16 +1614,18 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- std::string Bucket = Chunk.second.Bucket;
- IoHash ChunkHash = Chunk.first;
- ZenCacheValue CacheValue;
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
- IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1655,23 +1659,27 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic_uint32_t AddedChunkCount = 0;
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- }
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (AddedChunkCount.load() < NewChunks.size())
{
@@ -1710,12 +1718,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < GcChunkHashes.size())
{
@@ -1848,11 +1858,13 @@ TEST_CASE("cachestore.drop.bucket")
CHECK(Value1.Value);
std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- WorkComplete = true;
- });
+ Workers.ScheduleWork(
+ [&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ WorkComplete = true;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
// On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
// Our DropBucket execution blocks any incoming request from completing until we are done with the drop
CHECK(Zcs.DropBucket(Namespace, Bucket));
@@ -1931,14 +1943,16 @@ TEST_CASE("cachestore.drop.namespace")
CHECK(Value4.Value);
std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- Value2.Value = IoBuffer{};
- Value3.Value = IoBuffer{};
- Value4.Value = IoBuffer{};
- WorkComplete = true;
- });
+ Workers.ScheduleWork(
+ [&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ Value2.Value = IoBuffer{};
+ Value3.Value = IoBuffer{};
+ Value4.Value = IoBuffer{};
+ WorkComplete = true;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
// On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
// Our DropBucket execution blocks any incoming request from completing until we are done with the drop
CHECK(Zcs.DropNamespace(Namespace1));
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 6b89beb3d..49d24c21e 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -132,13 +132,18 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig)
WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Burst);
std::vector<std::future<void>> Work;
Work.emplace_back(
- WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }}));
- Work.emplace_back(WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() {
- m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
- }}));
- Work.emplace_back(WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() {
- m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
- }}));
+ WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }},
+ WorkerThreadPool::EMode::DisableBacklog));
+ Work.emplace_back(WorkerPool.EnqueueTask(
+ std::packaged_task<void()>{[&]() {
+ m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
+ }},
+ WorkerThreadPool::EMode::DisableBacklog));
+ Work.emplace_back(WorkerPool.EnqueueTask(
+ std::packaged_task<void()>{[&]() {
+ m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
+ }},
+ WorkerThreadPool::EMode::DisableBacklog));
for (std::future<void>& Result : Work)
{
if (Result.valid())
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index b00abb2cb..b7bfbd188 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -412,7 +412,7 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
std::atomic<bool> AbortFlag;
{
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
const bool Continue = m_BlockStore.IterateChunks(
@@ -1491,11 +1491,13 @@ TEST_CASE("compactcas.threadedinsert")
{
const IoHash& Hash = Chunk.first;
const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
- ZEN_ASSERT(InsertResult.New);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Buffer, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1511,13 +1513,15 @@ TEST_CASE("compactcas.threadedinsert")
{
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- IoHash Hash = IoHash::HashBuffer(Buffer);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, &Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Buffer);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1548,23 +1552,27 @@ TEST_CASE("compactcas.threadedinsert")
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Cas.InsertChunk(Chunk.second, Chunk.first);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Cas.InsertChunk(Chunk.second, Chunk.first);
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- if (Buffer)
- {
- CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
- }
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ if (Buffer)
+ {
+ CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
+ }
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
tsl::robin_set<IoHash, IoHash::Hasher> ChunksToDelete;
@@ -1649,11 +1657,13 @@ TEST_CASE("compactcas.threadedinsert")
WorkCompleted = 0;
for (const IoHash& ChunkHash : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
- CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, ChunkHash]() {
+ CHECK(Cas.HaveChunk(ChunkHash));
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < GcChunkHashes.size())
{
@@ -1711,7 +1721,8 @@ TEST_CASE("compactcas.restart")
RwLock::ExclusiveLockScope __(InsertLock);
Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
Offset += BatchCount;
}
WorkLatch.CountDown();
@@ -1964,7 +1975,8 @@ TEST_CASE("compactcas.iteratechunks")
RwLock::ExclusiveLockScope __(InsertLock);
Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
}
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
Offset += BatchCount;
}
WorkLatch.CountDown();
@@ -1998,82 +2010,84 @@ TEST_CASE("compactcas.iteratechunks")
for (size_t I = 0; I < 2; I++)
{
WorkLatch.AddCount(1);
- ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- std::vector<IoHash> PartialHashes;
- PartialHashes.reserve(Hashes.size() / 4);
- for (size_t Index = 0; Index < Hashes.size(); Index++)
- {
- size_t TestIndex = Index + I;
- if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1))
+ ThreadPool.ScheduleWork(
+ [&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ std::vector<IoHash> PartialHashes;
+ PartialHashes.reserve(Hashes.size() / 4);
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
{
- PartialHashes.push_back(Hashes[Index]);
+ size_t TestIndex = Index + I;
+ if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1))
+ {
+ PartialHashes.push_back(Hashes[Index]);
+ }
}
- }
- std::reverse(PartialHashes.begin(), PartialHashes.end());
+ std::reverse(PartialHashes.begin(), PartialHashes.end());
- std::vector<IoHash> NoFoundHashes;
- std::vector<size_t> NoFindIndexes;
+ std::vector<IoHash> NoFoundHashes;
+ std::vector<size_t> NoFindIndexes;
- NoFoundHashes.reserve(9);
- for (size_t J = 0; J < 9; J++)
- {
- std::string Data = fmt::format("oh no, we don't exist {}", J + 1);
- NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length()));
- }
-
- NoFindIndexes.reserve(9);
-
- // Sprinkle in chunks that are not found!
- auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
-
- std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size());
- std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size());
-
- CHECK(Cas.IterateChunks(
- PartialHashes,
- [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) {
- CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index));
- uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1);
- CHECK(PreviousCount == 0);
- FoundFlags[Index] = !!Payload;
- const IoHash& Hash = PartialHashes[Index];
- CHECK(Hash == IoHash::HashBuffer(Payload));
- return true;
- },
- &BatchWorkerPool,
- 2048u));
-
- for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++)
- {
- CHECK(FetchedCounts[FoundIndex].load() <= 1);
- if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end())
+ NoFoundHashes.reserve(9);
+ for (size_t J = 0; J < 9; J++)
{
- CHECK(FoundFlags[FoundIndex]);
+ std::string Data = fmt::format("oh no, we don't exist {}", J + 1);
+ NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length()));
}
- else
+
+ NoFindIndexes.reserve(9);
+
+ // Sprinkle in chunks that are not found!
+ auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+
+ std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size());
+ std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size());
+
+ CHECK(Cas.IterateChunks(
+ PartialHashes,
+ [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) {
+ CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index));
+ uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1);
+ CHECK(PreviousCount == 0);
+ FoundFlags[Index] = !!Payload;
+ const IoHash& Hash = PartialHashes[Index];
+ CHECK(Hash == IoHash::HashBuffer(Payload));
+ return true;
+ },
+ &BatchWorkerPool,
+ 2048u));
+
+ for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++)
{
- CHECK(!FoundFlags[FoundIndex]);
+ CHECK(FetchedCounts[FoundIndex].load() <= 1);
+ if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end())
+ {
+ CHECK(FoundFlags[FoundIndex]);
+ }
+ else
+ {
+ CHECK(!FoundFlags[FoundIndex]);
+ }
}
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
WorkLatch.CountDown();
WorkLatch.Wait();
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 68644be2d..365a933c1 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -665,7 +665,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 5023695b2..b08e6a3ca 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -869,7 +869,8 @@ GcManager::CollectGarbage(const GcSettings& Settings)
Ex.what());
SetCancelGC(true);
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
@@ -981,7 +982,8 @@ GcManager::CollectGarbage(const GcSettings& Settings)
SetCancelGC(true);
}
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
@@ -1021,77 +1023,80 @@ GcManager::CollectGarbage(const GcSettings& Settings)
GcReferencer* Referencer = m_GcReferencers[Index];
std::pair<std::string, GcReferencerStats>* ReferemcerStats = &Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
- ParallelWorkThreadPool.ScheduleWork([this,
- &Ctx,
- &WorkLeft,
- Referencer,
- Index,
- Result = &Result,
- ReferemcerStats,
- &ReferenceValidatorsLock,
- &ReferenceValidators]() {
- ZEN_MEMSCOPE(GetGcTag());
+ ParallelWorkThreadPool.ScheduleWork(
+ [this,
+ &Ctx,
+ &WorkLeft,
+ Referencer,
+ Index,
+ Result = &Result,
+ ReferemcerStats,
+ &ReferenceValidatorsLock,
+ &ReferenceValidators]() {
+ ZEN_MEMSCOPE(GetGcTag());
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- std::vector<GcReferenceValidator*> Validators;
- auto __ = MakeGuard([&Validators]() {
- while (!Validators.empty())
- {
- delete Validators.back();
- Validators.pop_back();
- }
- });
- try
- {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ std::vector<GcReferenceValidator*> Validators;
+ auto __ = MakeGuard([&Validators]() {
+ while (!Validators.empty())
+ {
+ delete Validators.back();
+ Validators.pop_back();
+ }
+ });
+ try
{
- SCOPED_TIMER(ReferemcerStats->second.CreateReferenceValidatorsMS =
- std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Validators = Referencer->CreateReferenceValidators(Ctx);
+ {
+ SCOPED_TIMER(ReferemcerStats->second.CreateReferenceValidatorsMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Validators = Referencer->CreateReferenceValidators(Ctx);
+ }
+ if (!Validators.empty())
+ {
+ RwLock::ExclusiveLockScope __(ReferenceValidatorsLock);
+ for (auto& ReferenceValidator : Validators)
+ {
+ size_t ReferencesStatsIndex = Result->ReferenceValidatorStats.size();
+ Result->ReferenceValidatorStats.push_back({ReferenceValidator->GetGcName(Ctx), {}});
+ ReferenceValidators.insert_or_assign(
+ std::unique_ptr<GcReferenceValidator>(ReferenceValidator),
+ ReferencesStatsIndex);
+ ReferenceValidator = nullptr;
+ }
+ }
}
- if (!Validators.empty())
+ catch (const std::system_error& Ex)
{
- RwLock::ExclusiveLockScope __(ReferenceValidatorsLock);
- for (auto& ReferenceValidator : Validators)
+ if (IsOOD(Ex) || IsOOM(Ex))
{
- size_t ReferencesStatsIndex = Result->ReferenceValidatorStats.size();
- Result->ReferenceValidatorStats.push_back({ReferenceValidator->GetGcName(Ctx), {}});
- ReferenceValidators.insert_or_assign(std::unique_ptr<GcReferenceValidator>(ReferenceValidator),
- ReferencesStatsIndex);
- ReferenceValidator = nullptr;
+ ZEN_WARN("GCV2: Failed creating reference validators for {}. Reason: '{}'",
+ Referencer->GetGcName(Ctx),
+ Ex.what());
+ }
+ else
+ {
+ ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
+ Referencer->GetGcName(Ctx),
+ Ex.what());
}
+ SetCancelGC(true);
}
- }
- catch (const std::system_error& Ex)
- {
- if (IsOOD(Ex) || IsOOM(Ex))
+ catch (const std::bad_alloc& Ex)
{
- ZEN_WARN("GCV2: Failed creating reference validators for {}. Reason: '{}'",
- Referencer->GetGcName(Ctx),
- Ex.what());
+ ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
+ Referencer->GetGcName(Ctx),
+ Ex.what());
+ SetCancelGC(true);
}
- else
+ catch (const std::exception& Ex)
{
ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
Referencer->GetGcName(Ctx),
Ex.what());
+ SetCancelGC(true);
}
- SetCancelGC(true);
- }
- catch (const std::bad_alloc& Ex)
- {
- ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
- Referencer->GetGcName(Ctx),
- Ex.what());
- SetCancelGC(true);
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
- Referencer->GetGcName(Ctx),
- Ex.what());
- SetCancelGC(true);
- }
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
@@ -1221,47 +1226,49 @@ GcManager::CollectGarbage(const GcSettings& Settings)
size_t Index = It.second;
std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
- LockedPhaseThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() {
- ZEN_MEMSCOPE(GetGcTag());
+ LockedPhaseThreadPool.ScheduleWork(
+ [this, &Ctx, Checker, Index, Stats, &WorkLeft]() {
+ ZEN_MEMSCOPE(GetGcTag());
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- try
- {
- SCOPED_TIMER(Stats->second.UpdateLockedStateMS =
- std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Checker->UpdateLockedState(Ctx);
- }
- catch (const std::system_error& Ex)
- {
- if (IsOOD(Ex) || IsOOM(Ex))
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ try
+ {
+ SCOPED_TIMER(Stats->second.UpdateLockedStateMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checker->UpdateLockedState(Ctx);
+ }
+ catch (const std::system_error& Ex)
+ {
+ if (IsOOD(Ex) || IsOOM(Ex))
+ {
+ ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'",
+ Checker->GetGcName(Ctx),
+ Ex.what());
+ }
+ else
+ {
+ ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'",
+ Checker->GetGcName(Ctx),
+ Ex.what());
+ }
+ SetCancelGC(true);
+ }
+ catch (const std::bad_alloc& Ex)
{
ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'",
Checker->GetGcName(Ctx),
Ex.what());
+ SetCancelGC(true);
}
- else
+ catch (const std::exception& Ex)
{
ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'",
Checker->GetGcName(Ctx),
Ex.what());
+ SetCancelGC(true);
}
- SetCancelGC(true);
- }
- catch (const std::bad_alloc& Ex)
- {
- ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'",
- Checker->GetGcName(Ctx),
- Ex.what());
- SetCancelGC(true);
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'",
- Checker->GetGcName(Ctx),
- Ex.what());
- SetCancelGC(true);
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
@@ -1373,7 +1380,8 @@ GcManager::CollectGarbage(const GcSettings& Settings)
Ex.what());
SetCancelGC(true);
}
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp
index 0ca2adab2..4e7bd79a3 100644
--- a/src/zenstore/workspaces.cpp
+++ b/src/zenstore/workspaces.cpp
@@ -622,17 +622,19 @@ Workspaces::GetWorkspaceShareChunks(const Oid& WorkspaceId,
for (size_t Index = 0; Index < ChunkRequests.size(); Index++)
{
WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&, Index]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- try
- {
- Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]);
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what());
- }
- });
+ WorkerPool.ScheduleWork(
+ [&, Index]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ try
+ {
+ Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
WorkLatch.CountDown();
WorkLatch.Wait();
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index 2171f4d62..e5e8db8d2 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -65,21 +65,23 @@ public:
m_PendingBackgroundWorkCount.AddCount(1);
try
{
- m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() {
- ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork");
- auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); });
- if (!m_CancelBackgroundWork)
- {
- try
- {
- Work();
- }
- catch (const std::exception& Ex)
+ m_BackgroundWorkPool.ScheduleWork(
+ [this, Work = std::move(Work)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork");
+ auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); });
+ if (!m_CancelBackgroundWork)
{
- ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what());
+ try
+ {
+ Work();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what());
+ }
}
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
catch (const std::exception& Ex)
{
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index cd1bf7dd7..4f2aad95f 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -805,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
RwLock Lock;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t PathIndex : Order)
{
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
index 639c6968c..05146d644 100644
--- a/src/zenutil/include/zenutil/parallelwork.h
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -13,7 +13,7 @@ namespace zen {
class ParallelWork
{
public:
- ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag);
+ ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode);
~ParallelWork();
@@ -26,21 +26,23 @@ public:
m_PendingWork.AddCount(1);
try
{
- WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
- auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
- try
- {
- while (m_PauseFlag && !m_AbortFlag)
+ WorkerPool.ScheduleWork(
+ [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
+ auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
+ try
{
- Sleep(2000);
+ while (m_PauseFlag && !m_AbortFlag)
+ {
+ Sleep(2000);
+ }
+ Work(m_AbortFlag);
}
- Work(m_AbortFlag);
- }
- catch (...)
- {
- OnError(std::current_exception(), m_AbortFlag);
- }
- });
+ catch (...)
+ {
+ OnError(std::current_exception(), m_AbortFlag);
+ }
+ },
+ m_Mode);
}
catch (const std::exception&)
{
@@ -63,10 +65,11 @@ private:
ExceptionCallback DefaultErrorFunction();
void RethrowErrors();
- std::atomic<bool>& m_AbortFlag;
- std::atomic<bool>& m_PauseFlag;
- bool m_DispatchComplete = false;
- Latch m_PendingWork;
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ const WorkerThreadPool::EMode m_Mode;
+ bool m_DispatchComplete = false;
+ Latch m_PendingWork;
RwLock m_ErrorLock;
std::vector<std::exception_ptr> m_Errors;
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index a571d1d11..95417078a 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -15,9 +15,10 @@
namespace zen {
-ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode)
: m_AbortFlag(AbortFlag)
, m_PauseFlag(PauseFlag)
+, m_Mode(Mode)
, m_PendingWork(1)
{
}
@@ -160,7 +161,7 @@ TEST_CASE("parallellwork.nowork")
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
Work.Wait();
}
@@ -170,7 +171,7 @@ TEST_CASE("parallellwork.basic")
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
@@ -184,7 +185,7 @@ TEST_CASE("parallellwork.throws_in_work")
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 10; I++)
{
Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
@@ -210,7 +211,7 @@ TEST_CASE("parallellwork.throws_in_dispatch")
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
@@ -234,6 +235,26 @@ TEST_CASE("parallellwork.throws_in_dispatch")
}
}
+TEST_CASE("parallellwork.limitqueue")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ Sleep(10);
+ });
+ }
+ Work.Wait();
+}
+
void
parallellwork_forcelink()
{