aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-08-22 16:03:01 +0200
committerGitHub Enterprise <[email protected]>2024-08-22 16:03:01 +0200
commit203d3f03f0d0ef51f414b5344462bde0a8fcaf1b (patch)
tree2d0cbe07dbf6b2d81a91e15c823ea0209205b39c /src
parentsafer calls to IsProcessRunning (#131) (diff)
downloadzen-203d3f03f0d0ef51f414b5344462bde0a8fcaf1b.tar.xz
zen-203d3f03f0d0ef51f414b5344462bde0a8fcaf1b.zip
separate worker pools into burst/background to avoid background jobs blocking client requests (#134)
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp4
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp10
-rw-r--r--src/zenserver/workspaces/httpworkspaces.cpp12
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp4
-rw-r--r--src/zenstore/cas.cpp2
-rw-r--r--src/zenstore/gc.cpp4
-rw-r--r--src/zenutil/include/zenutil/workerpools.h12
-rw-r--r--src/zenutil/workerpools.cpp118
8 files changed, 76 insertions, 90 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index dd007c8b4..803980b2b 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3227,7 +3227,7 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
}
return true;
},
- &GetSmallWorkerPool());
+ &GetSmallWorkerPool(EWorkloadType::Burst));
}
CbObjectWriter Response;
@@ -3323,7 +3323,7 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
Sizes.resize(Hashes.size(), 0u);
}
- WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool();
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
(void)FoundLog->IterateChunks(
Hashes,
[&](size_t Index, const IoBuffer& Chunk) -> bool {
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 15d329442..ab31d5ec5 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -1418,7 +1418,7 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
{
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool();
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
AsyncRemoteResult RemoteResult;
CbObject ContainerObject = BuildContainer(ChunkStore,
@@ -1777,8 +1777,8 @@ SaveOplog(CidStore& ChunkStore,
UploadInfo Info;
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool();
- WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool();
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
+ WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
@@ -2380,8 +2380,8 @@ LoadOplog(CidStore& ChunkStore,
Stopwatch Timer;
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool();
- WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool();
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
+ WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(EWorkloadType::Background);
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
uint64_t BlockCountToDownload = 0;
diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp
index 7242b2fba..6a4e9c466 100644
--- a/src/zenserver/workspaces/httpworkspaces.cpp
+++ b/src/zenserver/workspaces/httpworkspaces.cpp
@@ -662,7 +662,7 @@ HttpWorkspacesService::FilesRequest(HttpRouterRequest& Req, const Oid& Workspace
const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size");
std::optional<std::vector<Workspaces::ShareFile>> Files =
- m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool());
+ m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool(EWorkloadType::Burst));
if (!Files.has_value())
{
return ServerRequest.WriteResponse(HttpResponseCode::NotFound);
@@ -706,7 +706,8 @@ void
HttpWorkspacesService::ChunkInfoRequest(HttpRouterRequest& Req, const Oid& WorkspaceId, const Oid& ShareId, const Oid& ChunkId)
{
HttpServerRequest& ServerRequest = Req.ServerRequest();
- Workspaces::ShareFile File = m_Workspaces.GetWorkspaceShareChunkInfo(WorkspaceId, ShareId, ChunkId, GetSmallWorkerPool());
+ Workspaces::ShareFile File =
+ m_Workspaces.GetWorkspaceShareChunkInfo(WorkspaceId, ShareId, ChunkId, GetSmallWorkerPool(EWorkloadType::Burst));
if (File.Id != Oid::Zero)
{
CbObjectWriter Response;
@@ -738,7 +739,8 @@ HttpWorkspacesService::BatchRequest(HttpRouterRequest& Req, const Oid& Workspace
[](const RequestChunkEntry& Entry) {
return Workspaces::ChunkRequest{.ChunkId = Entry.ChunkId, .Offset = Entry.Offset, .Size = Entry.RequestBytes};
});
- std::vector<IoBuffer> Chunks = m_Workspaces.GetWorkspaceShareChunks(WorkspaceId, ShareId, Requests, GetSmallWorkerPool());
+ std::vector<IoBuffer> Chunks =
+ m_Workspaces.GetWorkspaceShareChunks(WorkspaceId, ShareId, Requests, GetSmallWorkerPool(EWorkloadType::Burst));
if (Chunks.empty())
{
return ServerRequest.WriteResponse(HttpResponseCode::NotFound);
@@ -794,7 +796,7 @@ HttpWorkspacesService::EntriesRequest(HttpRouterRequest& Req, const Oid& Workspa
m_WorkspacesStats.WorkspaceShareEntriesReadCount++;
std::optional<std::vector<Workspaces::ShareFile>> Files =
- m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool());
+ m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool(EWorkloadType::Burst));
if (!Files.has_value())
{
return ServerRequest.WriteResponse(HttpResponseCode::NotFound);
@@ -900,7 +902,7 @@ HttpWorkspacesService::ChunkRequest(HttpRouterRequest& Req, const Oid& Workspace
WorkspaceId,
ShareId,
std::vector<Workspaces::ChunkRequest>{Workspaces::ChunkRequest{.ChunkId = ChunkId, .Offset = Offset, .Size = Size}},
- GetSmallWorkerPool());
+ GetSmallWorkerPool(EWorkloadType::Burst));
if (!Response.empty() && Response[0])
{
m_WorkspacesStats.WorkspaceShareChunkHitCount++;
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 67910fa7f..5d167fc47 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -4035,7 +4035,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
RwLock SyncLock;
- WorkerThreadPool& Pool = GetLargeWorkerPool();
+ WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
Latch WorkLatch(1);
for (auto& BucketPath : FoundBucketDirectories)
{
@@ -4149,7 +4149,7 @@ ZenCacheDiskLayer::Flush()
}
}
{
- WorkerThreadPool& Pool = GetMediumWorkerPool();
+ WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
Latch WorkLatch(1);
try
{
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index f300c08e3..871558a52 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -113,7 +113,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig)
// Initialize payload storage
{
- WorkerThreadPool& WorkerPool = GetMediumWorkerPool();
+ WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Burst);
std::vector<std::future<void>> Work;
Work.emplace_back(
WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }}));
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 3f3adeb82..d6bf99c3e 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -632,7 +632,7 @@ GcManager::CollectGarbage(const GcSettings& Settings)
std::unordered_map<std::unique_ptr<GcStoreCompactor>, GcCompactStoreStats*> StoreCompactors;
RwLock StoreCompactorsLock;
- WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetMediumWorkerPool();
+ WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Background);
ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size());
if (!m_GcReferencers.empty())
@@ -2097,7 +2097,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time
Stopwatch Timer;
ZEN_INFO("scrubbing STARTING (delete mode => {}, skip CID => {})", DoDelete, SkipCid);
- WorkerThreadPool& ThreadPool = GetMediumWorkerPool();
+ WorkerThreadPool& ThreadPool = GetMediumWorkerPool(EWorkloadType::Background);
ScrubContext Ctx{ThreadPool, Deadline};
try
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
index 78a8e5c5e..9683ad720 100644
--- a/src/zenutil/include/zenutil/workerpools.h
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -6,14 +6,20 @@
namespace zen {
+enum class EWorkloadType
+{
+ Burst, // Used when you want to respond quickly, such as requests from a client
+ Background // Used for background jobs such as GC/Scrub/oplog import-export
+};
+
// Worker pool with std::thread::hardware_concurrency() worker threads, but at least one thread
-WorkerThreadPool& GetLargeWorkerPool();
+WorkerThreadPool& GetLargeWorkerPool(EWorkloadType WorkloadType);
// Worker pool with std::thread::hardware_concurrency() / 4 worker threads, but at least one thread
-WorkerThreadPool& GetMediumWorkerPool();
+WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType);
// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread
-WorkerThreadPool& GetSmallWorkerPool();
+WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType);
// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread
// This is useful for debugging when multiple async thread can make stepping in debugger complicated
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
index 144ef6817..e3165e838 100644
--- a/src/zenutil/workerpools.cpp
+++ b/src/zenutil/workerpools.cpp
@@ -19,90 +19,65 @@ namespace {
RwLock PoolLock;
- std::unique_ptr<WorkerThreadPool> LargeWorkerPool;
- std::unique_ptr<WorkerThreadPool> MediumWorkerPool;
- std::unique_ptr<WorkerThreadPool> SmallWorkerPool;
- std::unique_ptr<WorkerThreadPool> SyncWorkerPool;
-} // namespace
+ struct WorkerPool
+ {
+ std::unique_ptr<WorkerThreadPool> Pool;
+ const int TreadCount;
+ const std::string_view Name;
+ };
-WorkerThreadPool&
-GetLargeWorkerPool()
-{
+ WorkerPool BurstLargeWorkerPool = {.TreadCount = LargeWorkerThreadPoolTreadCount, .Name = "LargeThreadPool(burst)"};
+ WorkerPool BackgroundLargeWorkerPool = {.TreadCount = LargeWorkerThreadPoolTreadCount, .Name = "LargeThreadPool(bkg)"};
+
+ WorkerPool BurstMediumWorkerPool = {.TreadCount = MediumWorkerThreadPoolTreadCount, .Name = "MediumThreadPool(burst)"};
+ WorkerPool BackgroundMediumWorkerPool = {.TreadCount = MediumWorkerThreadPoolTreadCount, .Name = "MediumThreadPool(bkg)"};
+
+ WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"};
+ WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"};
+
+ WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"};
+
+ WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool)
{
- RwLock::SharedLockScope _(PoolLock);
- if (LargeWorkerPool)
{
- return *LargeWorkerPool;
+ RwLock::SharedLockScope _(PoolLock);
+ if (Pool.Pool)
+ {
+ return *Pool.Pool;
+ }
}
+ RwLock::ExclusiveLockScope _(PoolLock);
+ ZEN_ASSERT(!IsShutDown);
+ if (!Pool.Pool)
+ {
+ Pool.Pool.reset(new WorkerThreadPool(Pool.TreadCount, Pool.Name));
+ }
+ return *Pool.Pool;
}
- RwLock::ExclusiveLockScope _(PoolLock);
- ZEN_ASSERT(!IsShutDown);
- if (LargeWorkerPool)
- {
- return *LargeWorkerPool;
- }
- LargeWorkerPool.reset(new WorkerThreadPool(LargeWorkerThreadPoolTreadCount, "LargeThreadPool"));
- return *LargeWorkerPool;
+} // namespace
+
+WorkerThreadPool&
+GetLargeWorkerPool(EWorkloadType WorkloadType)
+{
+ return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstLargeWorkerPool : BackgroundLargeWorkerPool);
}
WorkerThreadPool&
-GetMediumWorkerPool()
+GetMediumWorkerPool(EWorkloadType WorkloadType)
{
- {
- RwLock::SharedLockScope _(PoolLock);
- if (MediumWorkerPool)
- {
- return *MediumWorkerPool;
- }
- }
- RwLock::ExclusiveLockScope _(PoolLock);
- ZEN_ASSERT(!IsShutDown);
- if (MediumWorkerPool)
- {
- return *MediumWorkerPool;
- }
- MediumWorkerPool.reset(new WorkerThreadPool(MediumWorkerThreadPoolTreadCount, "MediumThreadPool"));
- return *MediumWorkerPool;
+ return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstMediumWorkerPool : BackgroundMediumWorkerPool);
}
WorkerThreadPool&
-GetSmallWorkerPool()
+GetSmallWorkerPool(EWorkloadType WorkloadType)
{
- {
- RwLock::SharedLockScope _(PoolLock);
- if (SmallWorkerPool)
- {
- return *SmallWorkerPool;
- }
- }
- RwLock::ExclusiveLockScope _(PoolLock);
- ZEN_ASSERT(!IsShutDown);
- if (SmallWorkerPool)
- {
- return *SmallWorkerPool;
- }
- SmallWorkerPool.reset(new WorkerThreadPool(SmallWorkerThreadPoolTreadCount, "SmallThreadPool"));
- return *SmallWorkerPool;
+ return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstSmallWorkerPool : BackgroundSmallWorkerPool);
}
WorkerThreadPool&
GetSyncWorkerPool()
{
- {
- RwLock::SharedLockScope _(PoolLock);
- if (SyncWorkerPool)
- {
- return *SyncWorkerPool;
- }
- }
- RwLock::ExclusiveLockScope _(PoolLock);
- ZEN_ASSERT(!IsShutDown);
- if (SyncWorkerPool)
- {
- return *SyncWorkerPool;
- }
- SyncWorkerPool.reset(new WorkerThreadPool(0, "SyncThreadPool"));
- return *SyncWorkerPool;
+ return EnsurePoolPtr(SyncWorkerPool);
}
void
@@ -110,9 +85,12 @@ ShutdownWorkerPools()
{
RwLock::ExclusiveLockScope _(PoolLock);
IsShutDown = true;
- LargeWorkerPool.reset();
- MediumWorkerPool.reset();
- SmallWorkerPool.reset();
- SyncWorkerPool.reset();
+ BurstLargeWorkerPool.Pool.reset();
+ BackgroundLargeWorkerPool.Pool.reset();
+ BurstMediumWorkerPool.Pool.reset();
+ BackgroundMediumWorkerPool.Pool.reset();
+ BurstSmallWorkerPool.Pool.reset();
+ BackgroundSmallWorkerPool.Pool.reset();
+ SyncWorkerPool.Pool.reset();
}
} // namespace zen