diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-10 16:38:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-10 16:38:33 +0200 |
| commit | 339668ac935f781c06225d2d685642e27348772b (patch) | |
| tree | a5552d166eef9b5c72a2f9a6903e584dfc8968d7 | |
| parent | faster oplog entries with referenceset (#488) (diff) | |
| download | zen-339668ac935f781c06225d2d685642e27348772b.tar.xz zen-339668ac935f781c06225d2d685642e27348772b.zip | |
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
31 files changed, 1603 insertions, 1421 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a4a610a2..a7d6f9f01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ kHttpGatewayTimeout = 93, // GatewayTimeout(504) - Improvement: Faster project store `/entries` endpoint, 10-15% faster when using a reference set to limit entries +- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498 - Bugfix: Linux only, set ownership of installed files to specified user when using `zen service install --full --user` ## 5.7.0 diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index d858fa328..1e097edb7 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -635,7 +635,7 @@ namespace { std::atomic<uint64_t> DiscoveredItemCount = 0; std::atomic<uint64_t> DeletedItemCount = 0; std::atomic<uint64_t> DeletedByteCount = 0; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); struct AsyncVisitor : public GetDirectoryContentVisitor { @@ -2210,7 +2210,7 @@ namespace { WorkerThreadPool& NetworkPool = GetNetworkPool(); WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); const std::filesystem::path TempFolder = ".zen-tmp"; @@ -2654,7 +2654,7 @@ namespace { FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0; @@ -2888,7 +2888,7 @@ namespace { FilteredRate FilteredCompressedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic<size_t> UploadedBlockSize = 0; std::atomic<size_t> UploadedBlockCount = 0; @@ -3541,56 +3541,58 @@ namespace { FindBlocksStatistics FindBlocksStats; - std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask(std::packaged_task<PrepareBuildResult()>{ - [&Storage, BuildId, FindBlockMaxCount, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { - ZEN_TRACE_CPU("PrepareBuild"); + std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask( + std::packaged_task<PrepareBuildResult()>{ + [&Storage, BuildId, FindBlockMaxCount, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { + ZEN_TRACE_CPU("PrepareBuild"); - PrepareBuildResult Result; - Stopwatch Timer; - if (CreateBuild) - { - ZEN_TRACE_CPU("CreateBuild"); - - Stopwatch PutBuildTimer; - CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData); - Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); - Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); - Result.PayloadSize = MetaData.GetSize(); - } - else - { - ZEN_TRACE_CPU("PutBuild"); - Stopwatch GetBuildTimer; - CbObject Build = Storage.BuildStorage->GetBuild(BuildId); - Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); - Result.PayloadSize = Build.GetSize(); - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + PrepareBuildResult Result; + Stopwatch Timer; + if (CreateBuild) { - Result.PreferredMultipartChunkSize = ChunkSize; + ZEN_TRACE_CPU("CreateBuild"); + + Stopwatch PutBuildTimer; + CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData); + Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); + Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); + Result.PayloadSize = MetaData.GetSize(); } - else if (AllowMultiparts) + else { - ZEN_CONSOLE_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", - NiceBytes(Result.PreferredMultipartChunkSize)); + ZEN_TRACE_CPU("PutBuild"); + Stopwatch GetBuildTimer; + CbObject Build = Storage.BuildStorage->GetBuild(BuildId); + Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); + Result.PayloadSize = Build.GetSize(); + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + Result.PreferredMultipartChunkSize = ChunkSize; + } + else if (AllowMultiparts) + { + ZEN_CONSOLE_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", + NiceBytes(Result.PreferredMultipartChunkSize)); + } } - } - if (!IgnoreExistingBlocks) - { - ZEN_TRACE_CPU("FindBlocks"); - Stopwatch KnownBlocksTimer; - CbObject BlockDescriptionList = Storage.BuildStorage->FindBlocks(BuildId, FindBlockMaxCount); - if (BlockDescriptionList) + if (!IgnoreExistingBlocks) { - Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); + ZEN_TRACE_CPU("FindBlocks"); + Stopwatch KnownBlocksTimer; + CbObject BlockDescriptionList = Storage.BuildStorage->FindBlocks(BuildId, FindBlockMaxCount); + if (BlockDescriptionList) + { + Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); + } + FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); + FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); + Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); } - FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); - FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); - Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); - } - Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - return Result; - }}); + Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + return Result; + }}, + WorkerThreadPool::EMode::EnableBacklog); ChunkedFolderContent LocalContent; @@ -4593,7 +4595,7 @@ namespace { WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size()); @@ -5694,7 +5696,7 @@ namespace { Stopwatch Timer; auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); }); - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic<uint64_t> CompletedPathCount = 0; uint32_t PathIndex = 0; @@ -6006,7 +6008,7 @@ namespace { ScavengedPaths.resize(ScavengePathCount); ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging"); - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic<uint64_t> PathsFound(0); std::atomic<uint64_t> ChunksFound(0); @@ -6474,7 +6476,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing"); - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); struct LooseChunkHashWorkData { @@ -8124,7 +8126,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data"); - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t LocalPathIndex : FilesToCache) { @@ -8211,7 +8213,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State"); - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); OutLocalFolderState.Paths.resize(RemoteContent.Paths.size()); OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size()); @@ -11931,7 +11933,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return true; }; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); uint32_t Randomizer = 0; auto FileSizeIt = DownloadContent.FileSizes.begin(); diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp index ea7ad79ee..aa23373c4 100644 --- a/src/zen/cmds/cache_cmd.cpp +++ b/src/zen/cmds/cache_cmd.cpp @@ -478,25 +478,28 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a std::span<uint64_t> BatchSizes = std::span<uint64_t>(Sizes).subspan(Offset, Min(Max(SizeCount, 1u), Sizes.size() - Offset)); WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&, BatchSizes, RequestIndex]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - CbPackage Package; - if (m_MaxAttachmentCount > 0 && SizeCount > 0) - { - auto Request = GeneratePutCacheRecordRequest(BatchSizes, RequestIndex); - ZEN_ASSERT(Request.Format(Package)); - } - else - { - auto Request = GeneratePutCacheValueRequest(BatchSizes, RequestIndex); - ZEN_ASSERT(Request.Format(Package)); - } - - if (HttpClient::Response Response = Http.Post("/z$/$rpc", Package, HttpClient::Accept(ZenContentType::kCbPackage)); !Response) - { - ZEN_CONSOLE("{}", Response.ErrorMessage(fmt::format("{}: ", RequestIndex))); - } - }); + WorkerPool.ScheduleWork( + [&, BatchSizes, RequestIndex]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + CbPackage Package; + if (m_MaxAttachmentCount > 0 && SizeCount > 0) + { + auto Request = GeneratePutCacheRecordRequest(BatchSizes, RequestIndex); + ZEN_ASSERT(Request.Format(Package)); + } + else + { + auto Request = GeneratePutCacheValueRequest(BatchSizes, RequestIndex); + ZEN_ASSERT(Request.Format(Package)); + } + + if (HttpClient::Response Response = Http.Post("/z$/$rpc", Package, HttpClient::Accept(ZenContentType::kCbPackage)); + !Response) + { + ZEN_CONSOLE("{}", Response.ErrorMessage(fmt::format("{}: ", RequestIndex))); + } + }, + WorkerThreadPool::EMode::EnableBacklog); Offset += BatchSizes.size(); RequestIndex++; } diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index b7c0e0b7f..8ed52c764 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -2081,7 +2081,8 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg ZEN_CONSOLE_ERROR("Failed writing file to '{}'. Reason: '{}'", TargetPath, Ex.what()); } } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } } }; diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index 4e672eea5..fd6c80e98 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -501,7 +501,8 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) EntryIndex = EntryOffset.fetch_add(m_Stride); } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (!WorkLatch.Wait(1000)) diff --git a/src/zen/cmds/wipe_cmd.cpp b/src/zen/cmds/wipe_cmd.cpp index d2bc2f5c4..7424f2166 100644 --- a/src/zen/cmds/wipe_cmd.cpp +++ b/src/zen/cmds/wipe_cmd.cpp @@ -251,7 +251,7 @@ namespace { return Added; }; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); struct AsyncVisitor : public GetDirectoryContentVisitor { diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 8327838c9..5125beeca 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -2439,26 +2439,28 @@ GetDirectoryContent(const std::filesystem::path& RootDir, PendingWorkCount.AddCount(1); try { - WorkerPool.ScheduleWork([WorkerPool = &WorkerPool, - PendingWorkCount = &PendingWorkCount, - Visitor = Visitor, - Flags = Flags, - Path = std::move(Path), - RelativeRoot = RelativeRoot / DirectoryName]() { - ZEN_ASSERT(Visitor); - auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); }); - try - { - MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor); - FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(Path, SubVisitor); - Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content)); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what()); - } - }); + WorkerPool.ScheduleWork( + [WorkerPool = &WorkerPool, + PendingWorkCount = &PendingWorkCount, + Visitor = Visitor, + Flags = Flags, + Path = std::move(Path), + RelativeRoot = RelativeRoot / DirectoryName]() { + ZEN_ASSERT(Visitor); + auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); }); + try + { + MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor); + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(Path, SubVisitor); + Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content)); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what()); + } + }, + WorkerThreadPool::EMode::DisableBacklog); } catch (const std::exception Ex) { diff --git a/src/zencore/include/zencore/workthreadpool.h b/src/zencore/include/zencore/workthreadpool.h index 62356495c..4c38dd651 100644 --- a/src/zencore/include/zencore/workthreadpool.h +++ b/src/zencore/include/zencore/workthreadpool.h @@ -18,11 +18,7 @@ struct IWork : public RefCounted { virtual void Execute() = 0; - inline std::exception_ptr GetException() { return m_Exception; } - private: - std::exception_ptr m_Exception; - friend class WorkerThreadPool; }; @@ -35,13 +31,18 @@ public: WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName); ~WorkerThreadPool(); - void ScheduleWork(Ref<IWork> Work); - void ScheduleWork(std::function<void()>&& Work); + // Decides what to do if there are no free workers in the pool when the work is submitted + enum class EMode + { + EnableBacklog, // The work will be added to a backlog of work to do + DisableBacklog // The work will be executed synchronously in the caller thread + }; + + void ScheduleWork(Ref<IWork> Work, EMode Mode); + void ScheduleWork(std::function<void()>&& Work, EMode Mode); template<typename Func> - auto EnqueueTask(std::packaged_task<Func> Task); - - [[nodiscard]] size_t PendingWorkItemCount() const; + auto EnqueueTask(std::packaged_task<Func> Task, EMode Mode); private: struct Impl; @@ -54,7 +55,7 @@ private: template<typename Func> auto -WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task) +WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task, EMode Mode) { struct FutureWork : IWork { @@ -67,7 +68,7 @@ WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task) Ref<FutureWork> Work{new FutureWork(std::move(Task))}; auto Future = Work->m_Task.get_future(); - ScheduleWork(std::move(Work)); + ScheduleWork(std::move(Work), Mode); return Future; } diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index 5d727b69c..4aa8c113e 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -109,10 +109,12 @@ public: WorkerCounter.AddCount(1); try { - WorkerPool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); - Worker(); - }); + WorkerPool.ScheduleWork( + [&]() { + auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); }); + Worker(); + }, + WorkerThreadPool::EMode::EnableBacklog); return {.Id = NewJobId}; } catch (const std::exception& Ex) @@ -466,34 +468,36 @@ TEST_CASE("JobQueue") for (uint32_t I = 0; I < 100; I++) { JobsLatch.AddCount(1); - Pool.ScheduleWork([&Queue, &JobsLatch, I]() { - auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - JobsLatch.AddCount(1); - auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) { - auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("going to sleep", "", 100, 100); - Sleep(10); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("going to sleep again", "", 100, 50); - if ((I & 0xFF) == 0x10) - { - zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); - } - Sleep(10); - if (Context.IsCancelled()) - { - return; - } - Context.ReportProgress("done", "", 100, 0); - }); - }); + Pool.ScheduleWork( + [&Queue, &JobsLatch, I]() { + auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + JobsLatch.AddCount(1); + auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) { + auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("going to sleep", "", 100, 100); + Sleep(10); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("going to sleep again", "", 100, 50); + if ((I & 0xFF) == 0x10) + { + zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); + } + Sleep(10); + if (Context.IsCancelled()) + { + return; + } + Context.ReportProgress("done", "", 100, 0); + }); + }, + WorkerThreadPool::EMode::EnableBacklog); } auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string { diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index 445fe939e..e241c0de8 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -5,6 +5,7 @@ #include <zencore/blockingqueue.h> #include <zencore/except.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/testing.h> #include <zencore/thread.h> @@ -13,6 +14,10 @@ #include <thread> #include <vector> +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + #define ZEN_USE_WINDOWS_THREADPOOL 1 #if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL @@ -41,18 +46,23 @@ namespace { struct WorkerThreadPool::Impl { + const int m_ThreadCount = 0; PTP_POOL m_ThreadPool = nullptr; PTP_CLEANUP_GROUP m_CleanupGroup = nullptr; TP_CALLBACK_ENVIRON m_CallbackEnvironment; PTP_WORK m_Work = nullptr; - std::string m_WorkerThreadBaseName; - std::atomic<int> m_WorkerThreadCounter{0}; + std::string m_WorkerThreadBaseName; + std::atomic<size_t> m_WorkerThreadCounter{0}; + std::atomic<int> m_FreeWorkerCount{0}; - RwLock m_QueueLock; + mutable RwLock m_QueueLock; std::deque<Ref<IWork>> m_WorkQueue; - Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) + : m_ThreadCount(InThreadCount) + , m_WorkerThreadBaseName(WorkerThreadBaseName) + , m_FreeWorkerCount(m_ThreadCount) { // Thread pool setup @@ -62,11 +72,11 @@ struct WorkerThreadPool::Impl ThrowLastError("CreateThreadpool failed"); } - if (!SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount)) + if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount)) { ThrowLastError("SetThreadpoolThreadMinimum failed"); } - SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2); + SetThreadpoolThreadMaximum(m_ThreadPool, (DWORD)m_ThreadCount); InitializeThreadpoolEnvironment(&m_CallbackEnvironment); @@ -93,12 +103,29 @@ struct WorkerThreadPool::Impl CloseThreadpool(m_ThreadPool); } - void ScheduleWork(Ref<IWork> Work) + [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) { - m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); }); + if (Mode == WorkerThreadPool::EMode::DisableBacklog) + { + if (m_FreeWorkerCount <= 0) + { + return Work; + } + RwLock::ExclusiveLockScope _(m_QueueLock); + const int QueuedCount = gsl::narrow<int>(m_WorkQueue.size()); + if (QueuedCount >= m_FreeWorkerCount) + { + return Work; + } + m_WorkQueue.push_back(std::move(Work)); + } + else + { + m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); }); + } SubmitThreadpoolWork(m_Work); + return {}; } - [[nodiscard]] size_t PendingWorkItemCount() const { return 0; } static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work) { @@ -109,10 +136,13 @@ struct WorkerThreadPool::Impl void DoWork() { + m_FreeWorkerCount--; + auto _ = MakeGuard([&]() { m_FreeWorkerCount++; }); + if (!t_IsThreadNamed) { t_IsThreadNamed = true; - const int ThreadIndex = ++m_WorkerThreadCounter; + const size_t ThreadIndex = ++m_WorkerThreadCounter; zen::ExtendableStringBuilder<128> ThreadName; ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex; SetCurrentThreadName(ThreadName); @@ -121,7 +151,7 @@ struct WorkerThreadPool::Impl Ref<IWork> WorkFromQueue; { - RwLock::ExclusiveLockScope _{m_QueueLock}; + RwLock::ExclusiveLockScope __{m_QueueLock}; WorkFromQueue = std::move(m_WorkQueue.front()); m_WorkQueue.pop_front(); } @@ -141,20 +171,25 @@ struct WorkerThreadPool::ThreadStartInfo struct WorkerThreadPool::Impl { + const int m_ThreadCount = 0; void WorkerThreadFunction(ThreadStartInfo Info); std::string m_WorkerThreadBaseName; std::vector<std::thread> m_WorkerThreads; BlockingQueue<Ref<IWork>> m_WorkQueue; + std::atomic<int> m_FreeWorkerCount{0}; - Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) + : m_ThreadCount(InThreadCount) + , m_WorkerThreadBaseName(WorkerThreadBaseName) + , m_FreeWorkerCount(m_ThreadCount) { # if ZEN_WITH_TRACE trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str()); # endif - zen::Latch WorkerLatch{InThreadCount}; + zen::Latch WorkerLatch{m_ThreadCount}; - for (int i = 0; i < InThreadCount; ++i) + for (int i = 0; i < m_ThreadCount; ++i) { m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch}); } @@ -181,8 +216,23 @@ struct WorkerThreadPool::Impl m_WorkerThreads.clear(); } - void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); } - [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); } + [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode) + { + if (Mode == WorkerThreadPool::EMode::DisableBacklog) + { + if (m_FreeWorkerCount <= 0) + { + return Work; + } + const int QueuedCount = gsl::narrow<int>(m_WorkQueue.Size()); + if (QueuedCount >= m_FreeWorkerCount) + { + return Work; + } + } + m_WorkQueue.Enqueue(std::move(Work)); + return {}; + } }; void @@ -197,21 +247,23 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) Ref<IWork> Work; if (m_WorkQueue.WaitAndDequeue(Work)) { + m_FreeWorkerCount--; + auto _ = MakeGuard([&]() { m_FreeWorkerCount++; }); + try { ZEN_TRACE_CPU_FLUSH("AsyncWork"); Work->Execute(); + Work = {}; } catch (const AssertException& Ex) { - Work->m_Exception = std::current_exception(); - + Work = {}; ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); } catch (const std::exception& e) { - Work->m_Exception = std::current_exception(); - + Work = {}; ZEN_WARN("Caught exception in worker thread: {}", e.what()); } } @@ -243,48 +295,38 @@ WorkerThreadPool::~WorkerThreadPool() } void -WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +WorkerThreadPool::ScheduleWork(Ref<IWork> Work, EMode Mode) { if (m_Impl) { - m_Impl->ScheduleWork(std::move(Work)); - } - else - { - try + if (Work = m_Impl->ScheduleWork(std::move(Work), Mode); !Work) { - ZEN_TRACE_CPU_FLUSH("SyncWork"); - Work->Execute(); - } - catch (const AssertException& Ex) - { - Work->m_Exception = std::current_exception(); - - ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); + return; } - catch (const std::exception& e) - { - Work->m_Exception = std::current_exception(); + } - ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what()); - } + try + { + ZEN_TRACE_CPU_FLUSH("SyncWork"); + Work->Execute(); + Work = {}; + } + catch (const AssertException& Ex) + { + Work = {}; + ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); + } + catch (const std::exception& e) + { + Work = {}; + ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what()); } } void -WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) -{ - ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work)))); -} - -[[nodiscard]] size_t -WorkerThreadPool::PendingWorkItemCount() const +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work, EMode Mode) { - if (m_Impl) - { - return m_Impl->PendingWorkItemCount(); - } - return 0; + ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work))), Mode); } ////////////////////////////////////////////////////////////////////////// @@ -302,9 +344,10 @@ TEST_CASE("threadpool.basic") { WorkerThreadPool Threadpool{1}; - auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}); - auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}); - auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }}); + auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}, WorkerThreadPool::EMode::EnableBacklog); + auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}, WorkerThreadPool::EMode::EnableBacklog); + auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }}, + WorkerThreadPool::EMode::EnableBacklog); CHECK_EQ(Future42.get(), 42); CHECK_EQ(Future99.get(), 99); diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp index 51e06ae14..5a6232318 100644 --- a/src/zencore/zencore.cpp +++ b/src/zencore/zencore.cpp @@ -350,9 +350,10 @@ TEST_CASE("Assert.Callstack") WorkerThreadPool Pool(1); auto Task = Pool.EnqueueTask(std::packaged_task<int()>{[] { - ZEN_ASSERT(false); - return 1; - }}); + ZEN_ASSERT(false); + return 1; + }}, + WorkerThreadPool::EMode::EnableBacklog); try { diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index e57fa8a30..9bbfef255 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -835,7 +835,7 @@ HttpAsyncWorkRequest::IssueRequest(std::error_code& ErrorCode) ZEN_TRACE_CPU("httpsys::AsyncWork::IssueRequest"); ErrorCode.clear(); - Transaction().Server().WorkPool().ScheduleWork(m_WorkItem); + Transaction().Server().WorkPool().ScheduleWork(m_WorkItem, WorkerThreadPool::EMode::EnableBacklog); } HttpSysRequestHandler* diff --git a/src/zenhttp/transports/winsocktransport.cpp b/src/zenhttp/transports/winsocktransport.cpp index 8c82760bb..c06a50c95 100644 --- a/src/zenhttp/transports/winsocktransport.cpp +++ b/src/zenhttp/transports/winsocktransport.cpp @@ -304,18 +304,20 @@ SocketTransportPluginImpl::Initialize(TransportServer* ServerInterface) TransportServerConnection* ConnectionInterface{m_ServerInterface->CreateConnectionHandler(Connection)}; Connection->Initialize(ConnectionInterface, ClientSocket); - m_WorkerThreadpool->ScheduleWork([Connection] { - try - { - Connection->HandleConnection(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("exception caught in connection loop: {}", Ex.what()); - } - - delete Connection; - }); + m_WorkerThreadpool->ScheduleWork( + [Connection] { + try + { + Connection->HandleConnection(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("exception caught in connection loop: {}", Ex.what()); + } + + delete Connection; + }, + WorkerThreadPool::EMode::EnableBacklog); } else { diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 77ed87cb1..79e5db554 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -2142,20 +2142,23 @@ TEST_CASE("zcache.failing.upstream") for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++) { size_t Iteration = I; - Pool.ScheduleWork([&] { - std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); - if (NewKeys.size() != RecordsPerRequest) - { - ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); + Pool.ScheduleWork( + [&] { + std::vector<CacheKey> NewKeys = + PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest); + if (NewKeys.size() != RecordsPerRequest) + { + ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; + } + { + RwLock::ExclusiveLockScope _(KeysLock); + Keys[Iteration].swap(NewKeys); + } Completed.fetch_add(1); - return; - } - { - RwLock::ExclusiveLockScope _(KeysLock); - Keys[Iteration].swap(NewKeys); - } - Completed.fetch_add(1); - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } bool UseUpstream1 = false; while (Completed < ThreadCount * KeyMultiplier) @@ -2206,48 +2209,50 @@ TEST_CASE("zcache.failing.upstream") Completed.fetch_add(1); continue; } - Pool.ScheduleWork([&] { - GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); + Pool.ScheduleWork( + [&] { + GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy); - if (!Result.Success) - { - ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); - Completed.fetch_add(1); - return; - } - - if (Result.Result.Results.size() != LocalKeys.size()) - { - ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); - Completed.fetch_add(1); - return; - } - for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) - { - const CacheKey& ExpectedKey = LocalKeys[Index++]; - if (!Record) - { - continue; - } - if (Record->Key != ExpectedKey) + if (!Result.Success) { - continue; + ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration); + Completed.fetch_add(1); + return; } - if (Record->Values.size() != 1) + + if (Result.Result.Results.size() != LocalKeys.size()) { - continue; + ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration); + Completed.fetch_add(1); + return; } - - for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) { - if (!Value.Body) + const CacheKey& ExpectedKey = LocalKeys[Index++]; + if (!Record) { continue; } + if (Record->Key != ExpectedKey) + { + continue; + } + if (Record->Values.size() != 1) + { + continue; + } + + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + if (!Value.Body) + { + continue; + } + } } - } - Completed.fetch_add(1); - }); + Completed.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (Completed < ThreadCount * KeyMultiplier) { diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index f6ad14422..97522e892 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -73,7 +73,7 @@ GetStatsForDirectory(std::filesystem::path Dir) GetDirectoryContent(Dir, DirectoryContentFlags::IncludeAllEntries | DirectoryContentFlags::IncludeFileSizes, DirTraverser, - GetSmallWorkerPool(EWorkloadType::Burst), + GetSmallWorkerPool(EWorkloadType::Background), PendingWorkCount); PendingWorkCount.CountDown(); PendingWorkCount.Wait(); diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 08d0b12a7..c83065506 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1646,7 +1646,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 322af5e69..7cb115110 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1704,7 +1704,7 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) @@ -2249,7 +2249,7 @@ ProjectStore::Oplog::IterateChunks(const std::filesystem::path& P { std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) @@ -4122,21 +4122,24 @@ ProjectStore::Flush() WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (const Ref<Project>& Project : Projects) { - Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { - try - { - Project->Flush(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); - } - }); + Work.ScheduleWork( + WorkerPool, + [this, Project](std::atomic<bool>&) { + try + { + Project->Flush(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); + } + }, + 0); } } catch (const std::exception& Ex) diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 2a81ee3e3..feafcc810 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -245,101 +245,105 @@ namespace remotestore_impl { const std::vector<IoHash>& Chunks) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &WorkerPool, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &RemoteResult, - Chunks = Chunks, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to load attachments with {} chunks ({}): {}", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (IgnoreMissingAttachments) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - } - return; - } - Info.AttachmentsDownloaded.fetch_add(Chunks.size()); - ZEN_INFO("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + NetworkWorkerPool.ScheduleWork( + [&RemoteStore, + &ChunkStore, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &RemoteResult, + Chunks = Chunks, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) { + ReportMessage(OptionalContext, + fmt::format("Failed to load attachments with {} chunks ({}): {}", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); + if (IgnoreMissingAttachments) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + } return; } - if (!Chunks.empty()) + Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + ZEN_INFO("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) { - try - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - WriteAttachmentBuffers.reserve(Chunks.size()); - WriteRawHashes.reserve(Chunks.size()); - - for (const auto& It : Chunks) + return; + } + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(It.first); + return; } - std::vector<CidStore::InsertResult> InsertResults = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); - - for (size_t Index = 0; Index < InsertResults.size(); Index++) + if (!Chunks.empty()) { - if (InsertResults[Index].New) + try { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(Chunks.size()); + WriteRawHashes.reserve(Chunks.size()); + + for (const auto& It : Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(It.first); + } + std::vector<CidStore::InsertResult> InsertResults = + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to bulk save {} attachments", Chunks.size()), + Ex.what()); } } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk save {} attachments", Chunks.size()), - Ex.what()); - } - } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk load {} attachments", Chunks.size()), - Ex.what()); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to bulk load {} attachments", Chunks.size()), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); }; void DownloadAndSaveBlock(CidStore& ChunkStore, @@ -359,226 +363,237 @@ namespace remotestore_impl { uint32_t RetriesLeft) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - BlockHash, - &RemoteResult, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, - RetriesLeft, - Chunks = Chunks]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); - if (BlockResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - } - return; - } + NetworkWorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + BlockHash, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + Chunks = Chunks]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } - uint64_t BlockSize = BlockResult.Bytes.GetSize(); - Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); - Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - BlockHash, - &RemoteResult, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, - RetriesLeft, - Chunks = Chunks, - Bytes = std::move(BlockResult.Bytes)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) { + ReportMessage(OptionalContext, + fmt::format("Failed to download block attachment {} ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } return; } - try + if (RemoteResult.IsError()) { - ZEN_ASSERT(Bytes.Size() > 0); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); - if (!Compressed) - { - if (RetriesLeft > 0) + return; + } + uint64_t BlockSize = BlockResult.Bytes.GetSize(); + Info.AttachmentBlocksDownloaded.fetch_add(1); + ZEN_INFO("Loaded block attachment '{}' in {} ({})", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlockSize)); + Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + BlockHash, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + Chunks = Chunks, + Bytes = std::move(BlockResult.Bytes)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) { - ReportMessage( - OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - std::move(Chunks), - RetriesLeft - 1); + return; } - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), - {}); - return; - } - SharedBuffer BlockPayload = Compressed.Decompress(); - if (!BlockPayload) - { - if (RetriesLeft > 0) + try { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - std::move(Chunks), - RetriesLeft - 1); - } - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash)); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash), - {}); - return; - } - if (RawHash != BlockHash) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), - {}); - return; - } + ZEN_ASSERT(Bytes.Size() > 0); + std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; + WantedChunks.reserve(Chunks.size()); + WantedChunks.insert(Chunks.begin(), Chunks.end()); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); + if (!Compressed) + { + if (RetriesLeft > 0) + { + ReportMessage( + OptionalContext, + fmt::format( + "Block attachment {} is malformed, can't parse as compressed binary, retrying download", + BlockHash)); + return DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockHash, + std::move(Chunks), + RetriesLeft - 1); + } + ReportMessage( + OptionalContext, + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), + {}); + return; + } + SharedBuffer BlockPayload = Compressed.Decompress(); + if (!BlockPayload) + { + if (RetriesLeft > 0) + { + ReportMessage( + OptionalContext, + fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download", + BlockHash)); + return DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockHash, + std::move(Chunks), + RetriesLeft - 1); + } + ReportMessage(OptionalContext, + fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash)); + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash), + {}); + return; + } + if (RawHash != BlockHash) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), + {}); + return; + } - uint64_t BlockHeaderSize = 0; - bool StoreChunksOK = IterateChunkBlock( - BlockPayload, - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - } - }, - BlockHeaderSize); - - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } + uint64_t BlockHeaderSize = 0; + bool StoreChunksOK = IterateChunkBlock( + BlockPayload, + [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, + const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), + RawHash, + RawSize)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + WantedChunks.erase(AttachmentRawHash); + } + }, + BlockHeaderSize); + + if (!StoreChunksOK) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + return; + } - ZEN_ASSERT(WantedChunks.empty()); + ZEN_ASSERT(WantedChunks.empty()); - if (!WriteAttachmentBuffers.empty()) - { - auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < Results.size(); Index++) - { - const auto& Result = Results[Index]; - if (Result.New) + if (!WriteAttachmentBuffers.empty()) { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); + auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const auto& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } } } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed save block attachment {}", BlockHash), - Ex.what()); - } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to block attachment {}", BlockHash), - Ex.what()); - } - }); + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed save block attachment {}", BlockHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to block attachment {}", BlockHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); }; void DownloadAndSaveAttachment(CidStore& ChunkStore, @@ -596,92 +611,96 @@ namespace remotestore_impl { const IoHash& RawHash) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &WorkerPool, - &RemoteResult, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - RawHash, - &LoadAttachmentsTimer, - &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download large attachment {}: '{}', error code : {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode)); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); - } - return; - } - uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_INFO("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); - Info.AttachmentsDownloaded.fetch_add(1); + NetworkWorkerPool.ScheduleWork( + [&RemoteStore, + &ChunkStore, + &WorkerPool, + &RemoteResult, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + RawHash, + &LoadAttachmentsTimer, + &DownloadStartMS, + &Info, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } - Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - RawHash, - AttachmentSize, - Bytes = std::move(AttachmentResult.Bytes), - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) { - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); - if (InsertResult.New) + ReportMessage(OptionalContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) { - Info.AttachmentBytesStored.fetch_add(AttachmentSize); - Info.AttachmentsStored.fetch_add(1); + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); } + return; } - catch (const std::exception& Ex) + uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); + ZEN_INFO("Loaded large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(AttachmentSize)); + Info.AttachmentsDownloaded.fetch_add(1); + if (RemoteResult.IsError()) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Saving attachment {} failed", RawHash), - Ex.what()); + return; } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Loading attachment {} failed", RawHash), - Ex.what()); - } - }); + Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + RawHash, + AttachmentSize, + Bytes = std::move(AttachmentResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(AttachmentSize); + Info.AttachmentsStored.fetch_add(1); + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Saving attachment {} failed", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Loading attachment {} failed", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); }; void CreateBlock(WorkerThreadPool& WorkerPool, @@ -694,45 +713,47 @@ namespace remotestore_impl { AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); - WorkerPool.ScheduleWork([&Blocks, - &SectionsLock, - &OpSectionsLatch, - BlockIndex, - Chunks = std::move(ChunksInBlock), - &AsyncOnBlock, - &RemoteResult]() mutable { - auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - size_t ChunkCount = Chunks.size(); - try - { - ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - ChunkBlockDescription Block; - CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); + WorkerPool.ScheduleWork( + [&Blocks, + &SectionsLock, + &OpSectionsLatch, + BlockIndex, + Chunks = std::move(ChunksInBlock), + &AsyncOnBlock, + &RemoteResult]() mutable { + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); + if (RemoteResult.IsError()) { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex] = Block; + return; } - uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); - ZEN_INFO("Generated block with {} attachments in {} ({})", - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(BlockSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), - Ex.what()); - } - }); + size_t ChunkCount = Chunks.size(); + try + { + ZEN_ASSERT(ChunkCount > 0); + Stopwatch Timer; + ChunkBlockDescription Block; + CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex] = Block; + } + uint64_t BlockSize = CompressedBlock.GetCompressedSize(); + AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); + ZEN_INFO("Generated block with {} attachments in {} ({})", + ChunkCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + NiceBytes(BlockSize)); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); } struct UploadInfo @@ -861,89 +882,91 @@ namespace remotestore_impl { SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork([&ChunkStore, - &RemoteStore, - &SaveAttachmentsLatch, - &RemoteResult, - RawHash, - &CreatedBlocks, - &LooseFileAttachments, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - IoBuffer Payload; - ChunkBlockDescription Block; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = BlockIt->second.Payload; - Block = BlockIt->second.Block; - } - else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) - { - Payload = LooseTmpFileIt->second(RawHash); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}): {}", - RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - const bool IsBlock = Block.BlockHash == RawHash; - size_t PayloadSize = Payload.GetSize(); - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block)); - if (Result.ErrorCode) + WorkerPool.ScheduleWork( + [&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + RawHash, + &CreatedBlocks, + &LooseFileAttachments, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): {}", - RawHash, - NiceBytes(PayloadSize), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); return; } - if (IsBlock) + try { - Info.AttachmentBlocksUploaded.fetch_add(1); - Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved block attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); + IoBuffer Payload; + ChunkBlockDescription Block; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = BlockIt->second.Payload; + Block = BlockIt->second.Block; + } + else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) + { + Payload = LooseTmpFileIt->second(RawHash); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_WARN("Failed to save attachment '{}' ({}): {}", + RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + const bool IsBlock = Block.BlockHash == RawHash; + size_t PayloadSize = Payload.GetSize(); + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block)); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): {}", + RawHash, + NiceBytes(PayloadSize), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } + if (IsBlock) + { + Info.AttachmentBlocksUploaded.fetch_add(1); + Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved block attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } + else + { + Info.AttachmentsUploaded.fetch_add(1); + Info.AttachmentBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } } - else + catch (const std::exception& Ex) { - Info.AttachmentsUploaded.fetch_add(1); - Info.AttachmentBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("To upload attachment {}", RawHash), + Ex.what()); } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("To upload attachment {}", RawHash), - Ex.what()); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } if (IsCancelled(OptionalContext)) @@ -982,66 +1005,68 @@ namespace remotestore_impl { SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &SaveAttachmentsLatch, - &RemoteResult, - NeededChunks = std::move(NeededChunks), - &BulkBlockAttachmentsToUpload, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - size_t ChunksSize = 0; - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) + WorkerPool.ScheduleWork( + [&RemoteStore, + &ChunkStore, + &SaveAttachmentsLatch, + &RemoteResult, + NeededChunks = std::move(NeededChunks), + &BulkBlockAttachmentsToUpload, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) { - auto It = BulkBlockAttachmentsToUpload.find(Chunk); - ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompressedBuffer ChunkPayload = It->second(It->first).second; - if (!ChunkPayload) + return; + } + try + { + size_t ChunksSize = 0; + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) + { + auto It = BulkBlockAttachmentsToUpload.find(Chunk); + ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); + CompressedBuffer ChunkPayload = It->second(It->first).second; + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunksSize += ChunkPayload.GetCompressedSize(); + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachments with {} chunks ({}): {}", + NeededChunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; } - ChunksSize += ChunkPayload.GetCompressedSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); + Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); + Info.AttachmentBytesUploaded.fetch_add(ChunksSize); + + ZEN_INFO("Saved {} bulk attachments in {} ({})", + NeededChunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(ChunksSize)); } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) + catch (const std::exception& Ex) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): {}", - NeededChunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to buck upload {} attachments", NeededChunks.size()), + Ex.what()); } - Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); - Info.AttachmentBytesUploaded.fetch_add(ChunksSize); - - ZEN_INFO("Saved {} bulk attachments in {} ({})", - NeededChunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(ChunksSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to buck upload {} attachments", NeededChunks.size()), - Ex.what()); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } } @@ -1516,148 +1541,152 @@ BuildContainer(CidStore& ChunkStore, ResolveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork([&ChunkStore, - UploadAttachment = &It.second, - RawHash = It.first, - &ResolveAttachmentsLatch, - &ResolveLock, - &ChunkedHashes, - &LargeChunkHashes, - &ChunkedUploadAttachments, - &LooseUploadAttachments, - &MissingHashes, - &OnLargeAttachment, - &AttachmentTempPath, - &ChunkFile, - &ChunkedFiles, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - AllowChunking, - &RemoteResult, - OptionalContext]() { - auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return; - } + WorkerPool.ScheduleWork( + [&ChunkStore, + UploadAttachment = &It.second, + RawHash = It.first, + &ResolveAttachmentsLatch, + &ResolveLock, + &ChunkedHashes, + &LargeChunkHashes, + &ChunkedUploadAttachments, + &LooseUploadAttachments, + &MissingHashes, + &OnLargeAttachment, + &AttachmentTempPath, + &ChunkFile, + &ChunkedFiles, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + AllowChunking, + &RemoteResult, + OptionalContext]() { + auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return; + } - try - { - if (!UploadAttachment->RawPath.empty()) + try { - const std::filesystem::path& FilePath = UploadAttachment->RawPath; - IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); - if (RawData) + if (!UploadAttachment->RawPath.empty()) { - if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit) + const std::filesystem::path& FilePath = UploadAttachment->RawPath; + IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); + if (RawData) { - IoBufferFileReference FileRef; - (void)RawData.GetFileReference(FileRef); - - ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext); - ResolveLock.WithExclusiveLock( - [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { - ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); - ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - ChunkedHashes.insert(ChunkHash); - } - ChunkedFiles.emplace_back(std::move(Chunked)); + if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit) + { + IoBufferFileReference FileRef; + (void)RawData.GetFileReference(FileRef); + + ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext); + ResolveLock.WithExclusiveLock( + [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { + ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); + ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); + for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) + { + ChunkedHashes.insert(ChunkHash); + } + ChunkedFiles.emplace_back(std::move(Chunked)); + }); + } + else if (RawData.GetSize() > (MaxChunkEmbedSize * 2)) + { + // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't + // it will be a loose attachment instead of going into a block + OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) { + size_t RawSize = RawData.GetSize(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + + std::filesystem::path AttachmentPath = AttachmentTempPath; + AttachmentPath.append(RawHash.ToHexString()); + IoBuffer TempAttachmentBuffer = + WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); + ZEN_INFO("Saved temp attachment to '{}', {} ({})", + AttachmentPath, + NiceBytes(RawSize), + NiceBytes(TempAttachmentBuffer.GetSize())); + return TempAttachmentBuffer; }); - } - else if (RawData.GetSize() > (MaxChunkEmbedSize * 2)) - { - // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't - // it will be a loose attachment instead of going into a block - OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) { - size_t RawSize = RawData.GetSize(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + uint64_t RawSize = RawData.GetSize(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); + + uint64_t CompressedSize = Compressed.GetCompressedSize(); IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), NiceBytes(TempAttachmentBuffer.GetSize())); - return TempAttachmentBuffer; - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + + if (CompressedSize > MaxChunkEmbedSize) + { + OnLargeAttachment(RawHash, + [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + UploadAttachment->Size = CompressedSize; + ResolveLock.WithExclusiveLock( + [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { + LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); + }); + } + } } else { - uint64_t RawSize = RawData.GetSize(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); - - std::filesystem::path AttachmentPath = AttachmentTempPath; - AttachmentPath.append(RawHash.ToHexString()); - - uint64_t CompressedSize = Compressed.GetCompressedSize(); - IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); - ZEN_INFO("Saved temp attachment to '{}', {} ({})", - AttachmentPath, - NiceBytes(RawSize), - NiceBytes(TempAttachmentBuffer.GetSize())); - - if (CompressedSize > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - UploadAttachment->Size = CompressedSize; - ResolveLock.WithExclusiveLock( - [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); - }); - } + ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); } } else { - ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } - } - else - { - IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); - if (Data) - { - auto GetForChunking = - [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool { - if (Data.IsWholeFile()) - { - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); - if (Compressed) + IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); + if (Data) + { + auto GetForChunking = + [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool { + if (Data.IsWholeFile()) { - if (VerifyRawSize > ChunkFileSizeLimit) + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); + if (Compressed) { - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + if (VerifyRawSize > ChunkFileSizeLimit) { - if (CompressionLevel == OodleCompressionLevel::None) + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { - CompositeBuffer Decompressed = Compressed.DecompressToComposite(); - if (Decompressed) + if (CompressionLevel == OodleCompressionLevel::None) { - std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); - if (Segments.size() == 1) + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + if (Decompressed) { - IoBuffer DecompressedData = Segments[0].AsIoBuffer(); - if (DecompressedData.GetFileReference(OutFileRef)) + std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); + if (Segments.size() == 1) { - return true; + IoBuffer DecompressedData = Segments[0].AsIoBuffer(); + if (DecompressedData.GetFileReference(OutFileRef)) + { + return true; + } } } } @@ -1665,49 +1694,49 @@ BuildContainer(CidStore& ChunkStore, } } } - } - return false; - }; + return false; + }; - IoBufferFileReference FileRef; - if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef)) - { - ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext); - ResolveLock.WithExclusiveLock( - [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { - ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); - ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - ChunkedHashes.insert(ChunkHash); - } - ChunkedFiles.emplace_back(std::move(Chunked)); - }); - } - else if (Data.GetSize() > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, - [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + IoBufferFileReference FileRef; + if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef)) + { + ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext); + ResolveLock.WithExclusiveLock( + [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { + ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); + ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); + for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) + { + ChunkedHashes.insert(ChunkHash); + } + ChunkedFiles.emplace_back(std::move(Chunked)); + }); + } + else if (Data.GetSize() > MaxChunkEmbedSize) + { + OnLargeAttachment(RawHash, + [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + UploadAttachment->Size = Data.GetSize(); + } } else { - UploadAttachment->Size = Data.GetSize(); + ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); } } - else - { - ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to resolve attachment {}", RawHash), - Ex.what()); - } - }); + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to resolve attachment {}", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); } ResolveAttachmentsLatch.CountDown(); @@ -3077,101 +3106,103 @@ LoadOplog(CidStore& ChunkStore, { std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); DechunkLatch.AddCount(1); - WorkerPool.ScheduleWork([&ChunkStore, - &DechunkLatch, - TempFileName, - &Chunked, - &RemoteResult, - IgnoreMissingAttachments, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&DechunkLatch, &TempFileName] { - std::error_code Ec; - if (IsFile(TempFileName, Ec)) - { - RemoveFile(TempFileName, Ec); - if (Ec) + WorkerPool.ScheduleWork( + [&ChunkStore, + &DechunkLatch, + TempFileName, + &Chunked, + &RemoteResult, + IgnoreMissingAttachments, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&DechunkLatch, &TempFileName] { + std::error_code Ec; + if (IsFile(TempFileName, Ec)) { - ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); + RemoveFile(TempFileName, Ec); + if (Ec) + { + ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); + } } - } - DechunkLatch.CountDown(); - }); - try - { - if (RemoteResult.IsError()) - { - return; - } - Stopwatch Timer; - IoBuffer TmpBuffer; + DechunkLatch.CountDown(); + }); + try { - BasicFile TmpFile; - TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); + if (RemoteResult.IsError()) { - BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - - uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); - BLAKE3Stream HashingStream; - for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) + return; + } + Stopwatch Timer; + IoBuffer TmpBuffer; + { + BasicFile TmpFile; + TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); { - const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; - IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); - if (!Chunk) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - // We only add 1 as the resulting missing count will be 1 for the dechunked file - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); + BLAKE3Stream HashingStream; + for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) + { + const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; + IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); + if (!Chunk) { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), - "Missing chunk", + remotestore_impl::ReportMessage( + OptionalContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + } + return; + } + CompositeBuffer Decompressed = + CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); + for (const SharedBuffer& Segment : Decompressed.GetSegments()) + { + MemoryView SegmentData = Segment.GetView(); + HashingStream.Append(SegmentData); + TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); + Offset += SegmentData.GetSize(); } - return; - } - CompositeBuffer Decompressed = - CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); - for (const SharedBuffer& Segment : Decompressed.GetSegments()) - { - MemoryView SegmentData = Segment.GetView(); - HashingStream.Append(SegmentData); - TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); - Offset += SegmentData.GetSize(); } + BLAKE3 RawHash = HashingStream.GetHash(); + ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); + UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); + TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); } - BLAKE3 RawHash = HashingStream.GetHash(); - ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); - UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); - TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); + TmpFile.Close(); + TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); } - TmpFile.Close(); - TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + + ZEN_INFO("Dechunked attachment {} ({}) in {}", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); - if (InsertResult.New) + catch (const std::exception& Ex) { - Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); - Info.AttachmentsStored.fetch_add(1); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to dechunck file {}", Chunked.RawHash), + Ex.what()); } - - ZEN_INFO("Dechunked attachment {} ({}) in {}", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to dechunck file {}", Chunked.RawHash), - Ex.what()); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } DechunkLatch.CountDown(); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 7b56c64bd..c50f2bb13 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1686,80 +1686,82 @@ TEST_CASE("blockstore.iterate.chunks") Latch WorkLatch(1); Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - bool Continue = Store.IterateBlock( - Locations, - ChunkIndexes, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { - switch (ChunkIndex) - { - case 0: - CHECK(Data); - CHECK(Size == FirstChunkData.size()); - CHECK(std::string((const char*)Data, Size) == FirstChunkData); - break; - case 1: - CHECK(Data); - CHECK(Size == SecondChunkData.size()); - CHECK(std::string((const char*)Data, Size) == SecondChunkData); - break; - case 2: - CHECK(false); - break; - case 3: - CHECK(!Data); - break; - case 4: - CHECK(!Data); - break; - case 5: - CHECK(!Data); - break; - default: - CHECK(false); - break; - } - return true; - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { - switch (ChunkIndex) - { - case 0: - case 1: - CHECK(false); - break; - case 2: - { - CHECK(Size == VeryLargeChunk.size()); - char* Buffer = new char[Size]; - size_t HashOffset = 0; - File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { - memcpy(&Buffer[HashOffset], Data, Size); - HashOffset += Size; - }); - CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); - delete[] Buffer; - } - break; - case 3: - CHECK(false); - break; - case 4: - CHECK(false); - break; - case 5: - CHECK(false); - break; - default: - CHECK(false); - break; - } - return true; - }, - 0); - CHECK(Continue); - }); + WorkerPool.ScheduleWork( + [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + bool Continue = Store.IterateBlock( + Locations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { + switch (ChunkIndex) + { + case 0: + CHECK(Data); + CHECK(Size == FirstChunkData.size()); + CHECK(std::string((const char*)Data, Size) == FirstChunkData); + break; + case 1: + CHECK(Data); + CHECK(Size == SecondChunkData.size()); + CHECK(std::string((const char*)Data, Size) == SecondChunkData); + break; + case 2: + CHECK(false); + break; + case 3: + CHECK(!Data); + break; + case 4: + CHECK(!Data); + break; + case 5: + CHECK(!Data); + break; + default: + CHECK(false); + break; + } + return true; + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { + switch (ChunkIndex) + { + case 0: + case 1: + CHECK(false); + break; + case 2: + { + CHECK(Size == VeryLargeChunk.size()); + char* Buffer = new char[Size]; + size_t HashOffset = 0; + File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { + memcpy(&Buffer[HashOffset], Data, Size); + HashOffset += Size; + }); + CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); + delete[] Buffer; + } + break; + case 3: + CHECK(false); + break; + case 4: + CHECK(false); + break; + case 5: + CHECK(false); + break; + default: + CHECK(false); + break; + } + return true; + }, + 0); + CHECK(Continue); + }, + WorkerThreadPool::EMode::EnableBacklog); return true; }); WorkLatch.CountDown(); @@ -1796,11 +1798,15 @@ TEST_CASE("blockstore.thread.read.write") std::atomic<size_t> WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + ChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1810,13 +1816,15 @@ TEST_CASE("blockstore.thread.read.write") WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1828,20 +1836,24 @@ TEST_CASE("blockstore.thread.read.write") WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { - SecondChunkLocations[ChunkIndex] = L; - }); - WorkCompleted.fetch_add(1); - }); - WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size() * 2) { diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index d65c2bf06..4539746ba 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -376,7 +376,7 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB { std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); for (size_t Index = 0; Index < Metadatas.size(); Index++) { Work.ScheduleWork( diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index cacbbd966..fd52cdab5 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -4215,7 +4215,7 @@ ZenCacheDiskLayer::DiscoverBuckets() WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (auto& BucketPath : FoundBucketDirectories) @@ -4387,7 +4387,7 @@ ZenCacheDiskLayer::Flush() WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (auto& Bucket : Buckets) @@ -4434,7 +4434,8 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { # if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( - std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); + std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}, + WorkerThreadPool::EMode::EnableBacklog)); # else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 1f2d6c37f..3f27e6d21 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -1574,10 +1574,12 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, &Chunk]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1612,16 +1614,18 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - std::string Bucket = Chunk.second.Bucket; - IoHash ChunkHash = Chunk.first; - ZenCacheValue CacheValue; + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, &Chunk]() { + std::string Bucket = Chunk.second.Bucket; + IoHash ChunkHash = Chunk.first; + ZenCacheValue CacheValue; - CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); - IoHash Hash = IoHash::HashBuffer(CacheValue.Value); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); + CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); + IoHash Hash = IoHash::HashBuffer(CacheValue.Value); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1655,23 +1659,27 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic_uint32_t AddedChunkCount = 0; for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - } - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + } + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (AddedChunkCount.load() < NewChunks.size()) { @@ -1710,12 +1718,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : GcChunkHashes) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < GcChunkHashes.size()) { @@ -1848,11 +1858,13 @@ TEST_CASE("cachestore.drop.bucket") CHECK(Value1.Value); std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - WorkComplete = true; - }); + Workers.ScheduleWork( + [&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + WorkComplete = true; + }, + WorkerThreadPool::EMode::EnableBacklog); // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket // Our DropBucket execution blocks any incoming request from completing until we are done with the drop CHECK(Zcs.DropBucket(Namespace, Bucket)); @@ -1931,14 +1943,16 @@ TEST_CASE("cachestore.drop.namespace") CHECK(Value4.Value); std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - Value2.Value = IoBuffer{}; - Value3.Value = IoBuffer{}; - Value4.Value = IoBuffer{}; - WorkComplete = true; - }); + Workers.ScheduleWork( + [&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + Value2.Value = IoBuffer{}; + Value3.Value = IoBuffer{}; + Value4.Value = IoBuffer{}; + WorkComplete = true; + }, + WorkerThreadPool::EMode::EnableBacklog); // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket // Our DropBucket execution blocks any incoming request from completing until we are done with the drop CHECK(Zcs.DropNamespace(Namespace1)); diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 6b89beb3d..49d24c21e 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -132,13 +132,18 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig) WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Burst); std::vector<std::future<void>> Work; Work.emplace_back( - WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }})); - Work.emplace_back(WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { - m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block - }})); - Work.emplace_back(WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { - m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block - }})); + WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }}, + WorkerThreadPool::EMode::DisableBacklog)); + Work.emplace_back(WorkerPool.EnqueueTask( + std::packaged_task<void()>{[&]() { + m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block + }}, + WorkerThreadPool::EMode::DisableBacklog)); + Work.emplace_back(WorkerPool.EnqueueTask( + std::packaged_task<void()>{[&]() { + m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block + }}, + WorkerThreadPool::EMode::DisableBacklog)); for (std::future<void>& Result : Work) { if (Result.valid()) diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index b00abb2cb..b7bfbd188 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -412,7 +412,7 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas std::atomic<bool> AbortFlag; { std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { const bool Continue = m_BlockStore.IterateChunks( @@ -1491,11 +1491,13 @@ TEST_CASE("compactcas.threadedinsert") { const IoHash& Hash = Chunk.first; const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); - ZEN_ASSERT(InsertResult.New); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Buffer, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); + ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1511,13 +1513,15 @@ TEST_CASE("compactcas.threadedinsert") { for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - IoHash Hash = IoHash::HashBuffer(Buffer); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, &Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1548,23 +1552,27 @@ TEST_CASE("compactcas.threadedinsert") for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { - Cas.InsertChunk(Chunk.second, Chunk.first); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + Cas.InsertChunk(Chunk.second, Chunk.first); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - if (Buffer) - { - CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); - } - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + if (Buffer) + { + CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); + } + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } tsl::robin_set<IoHash, IoHash::Hasher> ChunksToDelete; @@ -1649,11 +1657,13 @@ TEST_CASE("compactcas.threadedinsert") WorkCompleted = 0; for (const IoHash& ChunkHash : GcChunkHashes) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { - CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < GcChunkHashes.size()) { @@ -1711,7 +1721,8 @@ TEST_CASE("compactcas.restart") RwLock::ExclusiveLockScope __(InsertLock); Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); @@ -1964,7 +1975,8 @@ TEST_CASE("compactcas.iteratechunks") RwLock::ExclusiveLockScope __(InsertLock); Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); @@ -1998,82 +2010,84 @@ TEST_CASE("compactcas.iteratechunks") for (size_t I = 0; I < 2; I++) { WorkLatch.AddCount(1); - ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - std::vector<IoHash> PartialHashes; - PartialHashes.reserve(Hashes.size() / 4); - for (size_t Index = 0; Index < Hashes.size(); Index++) - { - size_t TestIndex = Index + I; - if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1)) + ThreadPool.ScheduleWork( + [&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + std::vector<IoHash> PartialHashes; + PartialHashes.reserve(Hashes.size() / 4); + for (size_t Index = 0; Index < Hashes.size(); Index++) { - PartialHashes.push_back(Hashes[Index]); + size_t TestIndex = Index + I; + if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1)) + { + PartialHashes.push_back(Hashes[Index]); + } } - } - std::reverse(PartialHashes.begin(), PartialHashes.end()); + std::reverse(PartialHashes.begin(), PartialHashes.end()); - std::vector<IoHash> NoFoundHashes; - std::vector<size_t> NoFindIndexes; + std::vector<IoHash> NoFoundHashes; + std::vector<size_t> NoFindIndexes; - NoFoundHashes.reserve(9); - for (size_t J = 0; J < 9; J++) - { - std::string Data = fmt::format("oh no, we don't exist {}", J + 1); - NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length())); - } - - NoFindIndexes.reserve(9); - - // Sprinkle in chunks that are not found! - auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - - std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size()); - std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size()); - - CHECK(Cas.IterateChunks( - PartialHashes, - [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) { - CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index)); - uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1); - CHECK(PreviousCount == 0); - FoundFlags[Index] = !!Payload; - const IoHash& Hash = PartialHashes[Index]; - CHECK(Hash == IoHash::HashBuffer(Payload)); - return true; - }, - &BatchWorkerPool, - 2048u)); - - for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++) - { - CHECK(FetchedCounts[FoundIndex].load() <= 1); - if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end()) + NoFoundHashes.reserve(9); + for (size_t J = 0; J < 9; J++) { - CHECK(FoundFlags[FoundIndex]); + std::string Data = fmt::format("oh no, we don't exist {}", J + 1); + NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length())); } - else + + NoFindIndexes.reserve(9); + + // Sprinkle in chunks that are not found! + auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + + std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size()); + std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size()); + + CHECK(Cas.IterateChunks( + PartialHashes, + [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) { + CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index)); + uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1); + CHECK(PreviousCount == 0); + FoundFlags[Index] = !!Payload; + const IoHash& Hash = PartialHashes[Index]; + CHECK(Hash == IoHash::HashBuffer(Payload)); + return true; + }, + &BatchWorkerPool, + 2048u)); + + for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++) { - CHECK(!FoundFlags[FoundIndex]); + CHECK(FetchedCounts[FoundIndex].load() <= 1); + if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end()) + { + CHECK(FoundFlags[FoundIndex]); + } + else + { + CHECK(!FoundFlags[FoundIndex]); + } } - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } WorkLatch.CountDown(); WorkLatch.Wait(); diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 68644be2d..365a933c1 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -665,7 +665,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 5023695b2..b08e6a3ca 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -869,7 +869,8 @@ GcManager::CollectGarbage(const GcSettings& Settings) Ex.what()); SetCancelGC(true); } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); @@ -981,7 +982,8 @@ GcManager::CollectGarbage(const GcSettings& Settings) SetCancelGC(true); } } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); @@ -1021,77 +1023,80 @@ GcManager::CollectGarbage(const GcSettings& Settings) GcReferencer* Referencer = m_GcReferencers[Index]; std::pair<std::string, GcReferencerStats>* ReferemcerStats = &Result.ReferencerStats[Index]; WorkLeft.AddCount(1); - ParallelWorkThreadPool.ScheduleWork([this, - &Ctx, - &WorkLeft, - Referencer, - Index, - Result = &Result, - ReferemcerStats, - &ReferenceValidatorsLock, - &ReferenceValidators]() { - ZEN_MEMSCOPE(GetGcTag()); + ParallelWorkThreadPool.ScheduleWork( + [this, + &Ctx, + &WorkLeft, + Referencer, + Index, + Result = &Result, + ReferemcerStats, + &ReferenceValidatorsLock, + &ReferenceValidators]() { + ZEN_MEMSCOPE(GetGcTag()); - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - std::vector<GcReferenceValidator*> Validators; - auto __ = MakeGuard([&Validators]() { - while (!Validators.empty()) - { - delete Validators.back(); - Validators.pop_back(); - } - }); - try - { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + std::vector<GcReferenceValidator*> Validators; + auto __ = MakeGuard([&Validators]() { + while (!Validators.empty()) + { + delete Validators.back(); + Validators.pop_back(); + } + }); + try { - SCOPED_TIMER(ReferemcerStats->second.CreateReferenceValidatorsMS = - std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Validators = Referencer->CreateReferenceValidators(Ctx); + { + SCOPED_TIMER(ReferemcerStats->second.CreateReferenceValidatorsMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Validators = Referencer->CreateReferenceValidators(Ctx); + } + if (!Validators.empty()) + { + RwLock::ExclusiveLockScope __(ReferenceValidatorsLock); + for (auto& ReferenceValidator : Validators) + { + size_t ReferencesStatsIndex = Result->ReferenceValidatorStats.size(); + Result->ReferenceValidatorStats.push_back({ReferenceValidator->GetGcName(Ctx), {}}); + ReferenceValidators.insert_or_assign( + std::unique_ptr<GcReferenceValidator>(ReferenceValidator), + ReferencesStatsIndex); + ReferenceValidator = nullptr; + } + } } - if (!Validators.empty()) + catch (const std::system_error& Ex) { - RwLock::ExclusiveLockScope __(ReferenceValidatorsLock); - for (auto& ReferenceValidator : Validators) + if (IsOOD(Ex) || IsOOM(Ex)) { - size_t ReferencesStatsIndex = Result->ReferenceValidatorStats.size(); - Result->ReferenceValidatorStats.push_back({ReferenceValidator->GetGcName(Ctx), {}}); - ReferenceValidators.insert_or_assign(std::unique_ptr<GcReferenceValidator>(ReferenceValidator), - ReferencesStatsIndex); - ReferenceValidator = nullptr; + ZEN_WARN("GCV2: Failed creating reference validators for {}. Reason: '{}'", + Referencer->GetGcName(Ctx), + Ex.what()); + } + else + { + ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", + Referencer->GetGcName(Ctx), + Ex.what()); } + SetCancelGC(true); } - } - catch (const std::system_error& Ex) - { - if (IsOOD(Ex) || IsOOM(Ex)) + catch (const std::bad_alloc& Ex) { - ZEN_WARN("GCV2: Failed creating reference validators for {}. Reason: '{}'", - Referencer->GetGcName(Ctx), - Ex.what()); + ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", + Referencer->GetGcName(Ctx), + Ex.what()); + SetCancelGC(true); } - else + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", Referencer->GetGcName(Ctx), Ex.what()); + SetCancelGC(true); } - SetCancelGC(true); - } - catch (const std::bad_alloc& Ex) - { - ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", - Referencer->GetGcName(Ctx), - Ex.what()); - SetCancelGC(true); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", - Referencer->GetGcName(Ctx), - Ex.what()); - SetCancelGC(true); - } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); @@ -1221,47 +1226,49 @@ GcManager::CollectGarbage(const GcSettings& Settings) size_t Index = It.second; std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; WorkLeft.AddCount(1); - LockedPhaseThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() { - ZEN_MEMSCOPE(GetGcTag()); + LockedPhaseThreadPool.ScheduleWork( + [this, &Ctx, Checker, Index, Stats, &WorkLeft]() { + ZEN_MEMSCOPE(GetGcTag()); - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - try - { - SCOPED_TIMER(Stats->second.UpdateLockedStateMS = - std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checker->UpdateLockedState(Ctx); - } - catch (const std::system_error& Ex) - { - if (IsOOD(Ex) || IsOOM(Ex)) + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + try + { + SCOPED_TIMER(Stats->second.UpdateLockedStateMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checker->UpdateLockedState(Ctx); + } + catch (const std::system_error& Ex) + { + if (IsOOD(Ex) || IsOOM(Ex)) + { + ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'", + Checker->GetGcName(Ctx), + Ex.what()); + } + else + { + ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'", + Checker->GetGcName(Ctx), + Ex.what()); + } + SetCancelGC(true); + } + catch (const std::bad_alloc& Ex) { ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); + SetCancelGC(true); } - else + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); + SetCancelGC(true); } - SetCancelGC(true); - } - catch (const std::bad_alloc& Ex) - { - ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'", - Checker->GetGcName(Ctx), - Ex.what()); - SetCancelGC(true); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'", - Checker->GetGcName(Ctx), - Ex.what()); - SetCancelGC(true); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); @@ -1373,7 +1380,8 @@ GcManager::CollectGarbage(const GcSettings& Settings) Ex.what()); SetCancelGC(true); } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp index 0ca2adab2..4e7bd79a3 100644 --- a/src/zenstore/workspaces.cpp +++ b/src/zenstore/workspaces.cpp @@ -622,17 +622,19 @@ Workspaces::GetWorkspaceShareChunks(const Oid& WorkspaceId, for (size_t Index = 0; Index < ChunkRequests.size(); Index++) { WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&, Index]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - try - { - Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what()); - } - }); + WorkerPool.ScheduleWork( + [&, Index]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + try + { + Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what()); + } + }, + WorkerThreadPool::EMode::DisableBacklog); } WorkLatch.CountDown(); WorkLatch.Wait(); diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index 2171f4d62..e5e8db8d2 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -65,21 +65,23 @@ public: m_PendingBackgroundWorkCount.AddCount(1); try { - m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); - auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); - if (!m_CancelBackgroundWork) - { - try - { - Work(); - } - catch (const std::exception& Ex) + m_BackgroundWorkPool.ScheduleWork( + [this, Work = std::move(Work)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); + auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); + if (!m_CancelBackgroundWork) { - ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + try + { + Work(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + } } - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index cd1bf7dd7..4f2aad95f 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -805,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, RwLock Lock; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t PathIndex : Order) { diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h index 639c6968c..05146d644 100644 --- a/src/zenutil/include/zenutil/parallelwork.h +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -13,7 +13,7 @@ namespace zen { class ParallelWork { public: - ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag); + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode); ~ParallelWork(); @@ -26,21 +26,23 @@ public: m_PendingWork.AddCount(1); try { - WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { - auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); - try - { - while (m_PauseFlag && !m_AbortFlag) + WorkerPool.ScheduleWork( + [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); + try { - Sleep(2000); + while (m_PauseFlag && !m_AbortFlag) + { + Sleep(2000); + } + Work(m_AbortFlag); } - Work(m_AbortFlag); - } - catch (...) - { - OnError(std::current_exception(), m_AbortFlag); - } - }); + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + }, + m_Mode); } catch (const std::exception&) { @@ -63,10 +65,11 @@ private: ExceptionCallback DefaultErrorFunction(); void RethrowErrors(); - std::atomic<bool>& m_AbortFlag; - std::atomic<bool>& m_PauseFlag; - bool m_DispatchComplete = false; - Latch m_PendingWork; + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + const WorkerThreadPool::EMode m_Mode; + bool m_DispatchComplete = false; + Latch m_PendingWork; RwLock m_ErrorLock; std::vector<std::exception_ptr> m_Errors; diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index a571d1d11..95417078a 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -15,9 +15,10 @@ namespace zen { -ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) +ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode) : m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) +, m_Mode(Mode) , m_PendingWork(1) { } @@ -160,7 +161,7 @@ TEST_CASE("parallellwork.nowork") { std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); Work.Wait(); } @@ -170,7 +171,7 @@ TEST_CASE("parallellwork.basic") std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); @@ -184,7 +185,7 @@ TEST_CASE("parallellwork.throws_in_work") std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 10; I++) { Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { @@ -210,7 +211,7 @@ TEST_CASE("parallellwork.throws_in_dispatch") { std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { @@ -234,6 +235,26 @@ TEST_CASE("parallellwork.throws_in_dispatch") } } +TEST_CASE("parallellwork.limitqueue") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + Sleep(10); + }); + } + Work.Wait(); +} + void parallellwork_forcelink() { |