diff options
| author | Dan Engelbrecht <[email protected]> | 2024-08-22 16:03:01 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-08-22 16:03:01 +0200 |
| commit | 203d3f03f0d0ef51f414b5344462bde0a8fcaf1b (patch) | |
| tree | 2d0cbe07dbf6b2d81a91e15c823ea0209205b39c /src | |
| parent | safer calls to IsProcessRunning (#131) (diff) | |
| download | zen-203d3f03f0d0ef51f414b5344462bde0a8fcaf1b.tar.xz zen-203d3f03f0d0ef51f414b5344462bde0a8fcaf1b.zip | |
separate worker pools into burst/background to avoid background jobs blocking client requests (#134)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 10 | ||||
| -rw-r--r-- | src/zenserver/workspaces/httpworkspaces.cpp | 12 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/workerpools.h | 12 | ||||
| -rw-r--r-- | src/zenutil/workerpools.cpp | 118 |
8 files changed, 76 insertions, 90 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index dd007c8b4..803980b2b 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3227,7 +3227,7 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, } return true; }, - &GetSmallWorkerPool()); + &GetSmallWorkerPool(EWorkloadType::Burst)); } CbObjectWriter Response; @@ -3323,7 +3323,7 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, Sizes.resize(Hashes.size(), 0u); } - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool(); + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); (void)FoundLog->IterateChunks( Hashes, [&](size_t Index, const IoBuffer& Chunk) -> bool { diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 15d329442..ab31d5ec5 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -1418,7 +1418,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) { - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); AsyncRemoteResult RemoteResult; CbObject ContainerObject = BuildContainer(ChunkStore, @@ -1777,8 +1777,8 @@ SaveOplog(CidStore& ChunkStore, UploadInfo Info; - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); - WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(); + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); + WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); @@ -2380,8 +2380,8 @@ LoadOplog(CidStore& ChunkStore, Stopwatch Timer; - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); - WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(); + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); + WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(EWorkloadType::Background); std::unordered_set<IoHash, IoHash::Hasher> Attachments; uint64_t BlockCountToDownload = 0; diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp index 7242b2fba..6a4e9c466 100644 --- a/src/zenserver/workspaces/httpworkspaces.cpp +++ b/src/zenserver/workspaces/httpworkspaces.cpp @@ -662,7 +662,7 @@ HttpWorkspacesService::FilesRequest(HttpRouterRequest& Req, const Oid& Workspace const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); std::optional<std::vector<Workspaces::ShareFile>> Files = - m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool()); + m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool(EWorkloadType::Burst)); if (!Files.has_value()) { return ServerRequest.WriteResponse(HttpResponseCode::NotFound); @@ -706,7 +706,8 @@ void HttpWorkspacesService::ChunkInfoRequest(HttpRouterRequest& Req, const Oid& WorkspaceId, const Oid& ShareId, const Oid& ChunkId) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - Workspaces::ShareFile File = m_Workspaces.GetWorkspaceShareChunkInfo(WorkspaceId, ShareId, ChunkId, GetSmallWorkerPool()); + Workspaces::ShareFile File = + m_Workspaces.GetWorkspaceShareChunkInfo(WorkspaceId, ShareId, ChunkId, GetSmallWorkerPool(EWorkloadType::Burst)); if (File.Id != Oid::Zero) { CbObjectWriter Response; @@ -738,7 +739,8 @@ HttpWorkspacesService::BatchRequest(HttpRouterRequest& Req, const Oid& Workspace [](const RequestChunkEntry& Entry) { return Workspaces::ChunkRequest{.ChunkId = Entry.ChunkId, .Offset = Entry.Offset, .Size = Entry.RequestBytes}; }); - std::vector<IoBuffer> Chunks = m_Workspaces.GetWorkspaceShareChunks(WorkspaceId, ShareId, Requests, GetSmallWorkerPool()); + std::vector<IoBuffer> Chunks = + m_Workspaces.GetWorkspaceShareChunks(WorkspaceId, ShareId, Requests, GetSmallWorkerPool(EWorkloadType::Burst)); if (Chunks.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::NotFound); @@ -794,7 +796,7 @@ HttpWorkspacesService::EntriesRequest(HttpRouterRequest& Req, const Oid& Workspa m_WorkspacesStats.WorkspaceShareEntriesReadCount++; std::optional<std::vector<Workspaces::ShareFile>> Files = - m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool()); + m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool(EWorkloadType::Burst)); if (!Files.has_value()) { return ServerRequest.WriteResponse(HttpResponseCode::NotFound); @@ -900,7 +902,7 @@ HttpWorkspacesService::ChunkRequest(HttpRouterRequest& Req, const Oid& Workspace WorkspaceId, ShareId, std::vector<Workspaces::ChunkRequest>{Workspaces::ChunkRequest{.ChunkId = ChunkId, .Offset = Offset, .Size = Size}}, - GetSmallWorkerPool()); + GetSmallWorkerPool(EWorkloadType::Burst)); if (!Response.empty() && Response[0]) { m_WorkspacesStats.WorkspaceShareChunkHitCount++; diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 67910fa7f..5d167fc47 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -4035,7 +4035,7 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; - WorkerThreadPool& Pool = GetLargeWorkerPool(); + WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { @@ -4149,7 +4149,7 @@ ZenCacheDiskLayer::Flush() } } { - WorkerThreadPool& Pool = GetMediumWorkerPool(); + WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); Latch WorkLatch(1); try { diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index f300c08e3..871558a52 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -113,7 +113,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig) // Initialize payload storage { - WorkerThreadPool& WorkerPool = GetMediumWorkerPool(); + WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Burst); std::vector<std::future<void>> Work; Work.emplace_back( WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }})); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 3f3adeb82..d6bf99c3e 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -632,7 +632,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) std::unordered_map<std::unique_ptr<GcStoreCompactor>, GcCompactStoreStats*> StoreCompactors; RwLock StoreCompactorsLock; - WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetMediumWorkerPool(); + WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Background); ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size()); if (!m_GcReferencers.empty()) @@ -2097,7 +2097,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time Stopwatch Timer; ZEN_INFO("scrubbing STARTING (delete mode => {}, skip CID => {})", DoDelete, SkipCid); - WorkerThreadPool& ThreadPool = GetMediumWorkerPool(); + WorkerThreadPool& ThreadPool = GetMediumWorkerPool(EWorkloadType::Background); ScrubContext Ctx{ThreadPool, Deadline}; try diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h index 78a8e5c5e..9683ad720 100644 --- a/src/zenutil/include/zenutil/workerpools.h +++ b/src/zenutil/include/zenutil/workerpools.h @@ -6,14 +6,20 @@ namespace zen { +enum class EWorkloadType +{ + Burst, // Used when you want to respond quickly, such as requests from a client + Background // Used for background jobs such as GC/Scrub/oplog import-export +}; + // Worker pool with std::thread::hardware_concurrency() worker threads, but at least one thread -WorkerThreadPool& GetLargeWorkerPool(); +WorkerThreadPool& GetLargeWorkerPool(EWorkloadType WorkloadType); // Worker pool with std::thread::hardware_concurrency() / 4 worker threads, but at least one thread -WorkerThreadPool& GetMediumWorkerPool(); +WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType); // Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread -WorkerThreadPool& GetSmallWorkerPool(); +WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType); // Special worker pool that does not use worker thread but issues all scheduled work on the calling thread // This is useful for debugging when multiple async thread can make stepping in debugger complicated diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp index 144ef6817..e3165e838 100644 --- a/src/zenutil/workerpools.cpp +++ b/src/zenutil/workerpools.cpp @@ -19,90 +19,65 @@ namespace { RwLock PoolLock; - std::unique_ptr<WorkerThreadPool> LargeWorkerPool; - std::unique_ptr<WorkerThreadPool> MediumWorkerPool; - std::unique_ptr<WorkerThreadPool> SmallWorkerPool; - std::unique_ptr<WorkerThreadPool> SyncWorkerPool; -} // namespace + struct WorkerPool + { + std::unique_ptr<WorkerThreadPool> Pool; + const int TreadCount; + const std::string_view Name; + }; -WorkerThreadPool& -GetLargeWorkerPool() -{ + WorkerPool BurstLargeWorkerPool = {.TreadCount = LargeWorkerThreadPoolTreadCount, .Name = "LargeThreadPool(burst)"}; + WorkerPool BackgroundLargeWorkerPool = {.TreadCount = LargeWorkerThreadPoolTreadCount, .Name = "LargeThreadPool(bkg)"}; + + WorkerPool BurstMediumWorkerPool = {.TreadCount = MediumWorkerThreadPoolTreadCount, .Name = "MediumThreadPool(burst)"}; + WorkerPool BackgroundMediumWorkerPool = {.TreadCount = MediumWorkerThreadPoolTreadCount, .Name = "MediumThreadPool(bkg)"}; + + WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"}; + WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"}; + + WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"}; + + WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool) { - RwLock::SharedLockScope _(PoolLock); - if (LargeWorkerPool) { - return *LargeWorkerPool; + RwLock::SharedLockScope _(PoolLock); + if (Pool.Pool) + { + return *Pool.Pool; + } } + RwLock::ExclusiveLockScope _(PoolLock); + ZEN_ASSERT(!IsShutDown); + if (!Pool.Pool) + { + Pool.Pool.reset(new WorkerThreadPool(Pool.TreadCount, Pool.Name)); + } + return *Pool.Pool; } - RwLock::ExclusiveLockScope _(PoolLock); - ZEN_ASSERT(!IsShutDown); - if (LargeWorkerPool) - { - return *LargeWorkerPool; - } - LargeWorkerPool.reset(new WorkerThreadPool(LargeWorkerThreadPoolTreadCount, "LargeThreadPool")); - return *LargeWorkerPool; +} // namespace + +WorkerThreadPool& +GetLargeWorkerPool(EWorkloadType WorkloadType) +{ + return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstLargeWorkerPool : BackgroundLargeWorkerPool); } WorkerThreadPool& -GetMediumWorkerPool() +GetMediumWorkerPool(EWorkloadType WorkloadType) { - { - RwLock::SharedLockScope _(PoolLock); - if (MediumWorkerPool) - { - return *MediumWorkerPool; - } - } - RwLock::ExclusiveLockScope _(PoolLock); - ZEN_ASSERT(!IsShutDown); - if (MediumWorkerPool) - { - return *MediumWorkerPool; - } - MediumWorkerPool.reset(new WorkerThreadPool(MediumWorkerThreadPoolTreadCount, "MediumThreadPool")); - return *MediumWorkerPool; + return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstMediumWorkerPool : BackgroundMediumWorkerPool); } WorkerThreadPool& -GetSmallWorkerPool() +GetSmallWorkerPool(EWorkloadType WorkloadType) { - { - RwLock::SharedLockScope _(PoolLock); - if (SmallWorkerPool) - { - return *SmallWorkerPool; - } - } - RwLock::ExclusiveLockScope _(PoolLock); - ZEN_ASSERT(!IsShutDown); - if (SmallWorkerPool) - { - return *SmallWorkerPool; - } - SmallWorkerPool.reset(new WorkerThreadPool(SmallWorkerThreadPoolTreadCount, "SmallThreadPool")); - return *SmallWorkerPool; + return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstSmallWorkerPool : BackgroundSmallWorkerPool); } WorkerThreadPool& GetSyncWorkerPool() { - { - RwLock::SharedLockScope _(PoolLock); - if (SyncWorkerPool) - { - return *SyncWorkerPool; - } - } - RwLock::ExclusiveLockScope _(PoolLock); - ZEN_ASSERT(!IsShutDown); - if (SyncWorkerPool) - { - return *SyncWorkerPool; - } - SyncWorkerPool.reset(new WorkerThreadPool(0, "SyncThreadPool")); - return *SyncWorkerPool; + return EnsurePoolPtr(SyncWorkerPool); } void @@ -110,9 +85,12 @@ ShutdownWorkerPools() { RwLock::ExclusiveLockScope _(PoolLock); IsShutDown = true; - LargeWorkerPool.reset(); - MediumWorkerPool.reset(); - SmallWorkerPool.reset(); - SyncWorkerPool.reset(); + BurstLargeWorkerPool.Pool.reset(); + BackgroundLargeWorkerPool.Pool.reset(); + BurstMediumWorkerPool.Pool.reset(); + BackgroundMediumWorkerPool.Pool.reset(); + BurstSmallWorkerPool.Pool.reset(); + BackgroundSmallWorkerPool.Pool.reset(); + SyncWorkerPool.Pool.reset(); } } // namespace zen |