aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-03-28 14:56:20 +0100
committerGitHub Enterprise <[email protected]>2024-03-28 14:56:20 +0100
commit76ac4d541c603dd869e18cfbc6644ebf6c6e22d7 (patch)
treef919560d5ddb5a33057f9337d930ca327149dafd /src
parentadd "fieldnames" query param for GetProjectFiles/GetProjectChunkInfos (#29) (diff)
downloadzen-76ac4d541c603dd869e18cfbc6644ebf6c6e22d7.tar.xz
zen-76ac4d541c603dd869e18cfbc6644ebf6c6e22d7.zip
Use multithreading to fetch size/rawsize of entries in `/prj/{project}/oplog/{log}/chunkinfos` and `/prj/{project}/oplog/{log}/files` (#30)
- Improvement: Use multithreading to fetch size/rawsize of entries in `/prj/{project}/oplog/{log}/chunkinfos` and `/prj/{project}/oplog/{log}/files` - Improvement: Add `GetMediumWorkerPool()` in addition to `LargeWorkerPool()` and `SmallWorkerPool()`
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp219
-rw-r--r--src/zenserver/projectstore/projectstore.h1
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp4
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp2
-rw-r--r--src/zenstore/cas.cpp2
-rw-r--r--src/zenstore/gc.cpp4
-rw-r--r--src/zenutil/include/zenutil/workerpools.h7
-rw-r--r--src/zenutil/workerpools.cpp27
8 files changed, 198 insertions, 68 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index f01134fa5..dd390d08c 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -20,6 +20,7 @@
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/rpcrecording.h>
#include <zenutil/packageformat.h>
+#include <zenutil/workerpools.h>
#include "fileremoteprojectstore.h"
#include "jupiterremoteprojectstore.h"
@@ -844,6 +845,15 @@ ProjectStore::Oplog::ReplayLog()
}
IoBuffer
+ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash)
+{
+ IoBuffer Chunk = m_CidStore.FindChunkByCid(RawHash);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+
+ return Chunk;
+}
+
+IoBuffer
ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
{
RwLock::SharedLockScope OplogLock(m_OplogLock);
@@ -2532,49 +2542,101 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize");
const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size");
- CbObjectWriter Response;
- Response.BeginArray("files"sv);
+ std::vector<Oid> Ids;
+ std::vector<std::string> ServerPaths;
+ std::vector<std::string> ClientPaths;
+ std::vector<uint64_t> Sizes;
+ std::vector<uint64_t> RawSizes;
+ size_t Count = 0;
FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
+ if (WantsIdField || WantsRawSizeField || WantsSizeField)
+ {
+ Ids.push_back(Id);
+ }
+ if (WantsServerPathField)
+ {
+ ServerPaths.push_back(std::string(ServerPath));
+ }
+ if (WantsClientPathField)
+ {
+ ClientPaths.push_back(std::string(ClientPath));
+ }
+ Count++;
+ });
+ if (WantsRawSizeField || WantsSizeField)
+ {
+ if (WantsSizeField)
+ {
+ Sizes.resize(Ids.size(), 0u);
+ }
+ if (WantsRawSizeField)
+ {
+ RawSizes.resize(Ids.size(), 0u);
+ }
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool();
+ Latch WorkLatch(1);
+
+ for (size_t Index = 0; Index < Ids.size(); Index++)
+ {
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&WorkLatch, FoundLog, WantsSizeField, WantsRawSizeField, &Sizes, &RawSizes, Index, ChunkId = Ids[Index]]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (IoBuffer Chunk = FoundLog->FindChunk(ChunkId))
+ {
+ uint64_t Size = Chunk.GetSize();
+ if (WantsRawSizeField)
+ {
+ uint64_t RawSize = Size;
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash __;
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
+ }
+ RawSizes[Index] = RawSize;
+ }
+ if (WantsSizeField)
+ {
+ Sizes[Index] = Size;
+ }
+ }
+ });
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ }
+
+ CbObjectWriter Response;
+ Response.BeginArray("files"sv);
+ for (size_t Index = 0; Index < Count; Index++)
+ {
Response.BeginObject();
if (WantsIdField)
{
- Response << "id"sv << Id;
+ Response << "id"sv << Ids[Index];
+ }
+ if (WantsServerPathField)
+ {
+ Response << "serverpath"sv << ServerPaths[Index];
}
if (WantsClientPathField)
{
- Response << "clientpath"sv << ClientPath;
+ Response << "clientpath"sv << ClientPaths[Index];
}
- if (WantsServerPathField && !ServerPath.empty())
+ if (WantsSizeField)
{
- Response << "serverpath"sv << ServerPath;
+ Response << "size"sv << Sizes[Index];
}
- if (WantsRawSizeField || WantsSizeField)
+ if (WantsRawSizeField)
{
- IoBuffer Chunk = FoundLog->FindChunk(Id);
- if (WantsSizeField)
- {
- Response << "size"sv << Chunk.GetSize();
- }
- if (WantsRawSizeField)
- {
- if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
- {
- IoHash _;
- uint64_t RawSize = 0;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), _, RawSize);
- Response << "rawsize"sv << RawSize;
- }
- else
- {
- Response << "rawsize"sv << Chunk.GetSize();
- }
- }
+ Response << "rawsize"sv << RawSizes[Index];
}
Response.EndObject();
- });
-
+ }
Response.EndArray();
+
OutPayload = Response.Save();
return {HttpResponseCode::OK, {}};
}
@@ -2603,9 +2665,6 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
}
Project->TouchOplog(OplogId);
- std::vector<std::pair<Oid, IoHash>> ChunkInfos;
- FoundLog->IterateChunkMap([&ChunkInfos](const Oid& Id, const IoHash& Hash) { ChunkInfos.push_back({Id, Hash}); });
-
const bool WantsAllFields = WantedFieldNames.empty();
const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id");
@@ -2613,45 +2672,89 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize");
const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size");
+ std::vector<Oid> Ids;
+ std::vector<IoHash> Hashes;
+ std::vector<uint64_t> RawSizes;
+ std::vector<uint64_t> Sizes;
+
+ size_t Count = 0;
+ FoundLog->IterateChunkMap([&](const Oid& Id, const IoHash& Hash) {
+ if (WantsIdField)
+ {
+ Ids.push_back(Id);
+ }
+ if (WantsRawHashField || WantsRawSizeField || WantsSizeField)
+ {
+ Hashes.push_back(Hash);
+ }
+ Count++;
+ });
+
+ if (WantsRawSizeField || WantsSizeField)
+ {
+ if (WantsRawSizeField)
+ {
+ RawSizes.resize(Hashes.size(), 0u);
+ }
+ if (WantsSizeField)
+ {
+ Sizes.resize(Hashes.size(), 0u);
+ }
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool();
+ Latch WorkLatch(1);
+
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
+ {
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&WorkLatch, FoundLog, WantsSizeField, WantsRawSizeField, &Sizes, &RawSizes, Index, RawHash = Hashes[Index]]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (IoBuffer Chunk = FoundLog->GetChunkByRawHash(RawHash))
+ {
+ uint64_t Size = Chunk.GetSize();
+ if (WantsRawSizeField)
+ {
+ uint64_t RawSize = Size;
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash __;
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
+ }
+ RawSizes[Index] = RawSize;
+ }
+ if (WantsSizeField)
+ {
+ Sizes[Index] = Size;
+ }
+ }
+ });
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ }
+
CbObjectWriter Response;
Response.BeginArray("chunkinfos"sv);
- for (const auto& ChunkInfo : ChunkInfos)
+ for (size_t Index = 0; Index < Count; Index++)
{
Response.BeginObject();
if (WantsIdField)
{
- Response << "id"sv << ChunkInfo.first;
+ Response << "id"sv << Ids[Index];
}
if (WantsRawHashField)
{
- Response << "rawhash"sv << ChunkInfo.second;
+ Response << "rawhash"sv << Hashes[Index];
}
-
- if (WantsRawSizeField || WantsSizeField)
+ if (WantsSizeField)
{
- if (IoBuffer Chunk = FoundLog->FindChunk(ChunkInfo.first))
- {
- uint64_t Size = Chunk.GetSize();
- if (WantsSizeField)
- {
- Response << "size"sv << Size;
- }
- if (WantsRawSizeField)
- {
- if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
- {
- IoHash _;
- uint64_t RawSize = 0;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), _, RawSize);
- Response << "rawsize"sv << RawSize;
- }
- else
- {
- Response << "rawsize"sv << Size;
- }
- }
- }
+ Response << "size"sv << Sizes[Index];
+ }
+ if (WantsRawSizeField)
+ {
+ Response << "rawsize"sv << RawSizes[Index];
}
Response.EndObject();
}
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index e07853ca8..e27bf6e49 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -102,6 +102,7 @@ public:
int GetMaxOpIndex() const;
IoBuffer FindChunk(const Oid& ChunkId);
+ IoBuffer GetChunkByRawHash(const IoHash& RawHash);
inline static const uint32_t kInvalidOp = ~0u;
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 3887272bf..ae4777278 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -1711,7 +1711,7 @@ SaveOplog(CidStore& ChunkStore,
UploadInfo Info;
WorkerThreadPool& WorkerPool = GetLargeWorkerPool();
- WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool();
+ WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool();
const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
@@ -2239,7 +2239,7 @@ LoadOplog(CidStore& ChunkStore,
Stopwatch Timer;
WorkerThreadPool& WorkerPool = GetLargeWorkerPool();
- WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool();
+ WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool();
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 93c841e46..d897e26ce 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -3620,7 +3620,7 @@ ZenCacheDiskLayer::Flush()
}
}
{
- WorkerThreadPool& Pool = GetSmallWorkerPool();
+ WorkerThreadPool& Pool = GetMediumWorkerPool();
Latch WorkLatch(1);
try
{
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 4f137744b..a1a4a0acc 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -107,7 +107,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig)
// Initialize payload storage
{
- WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
+ WorkerThreadPool& WorkerPool = GetMediumWorkerPool();
std::vector<std::future<void>> Work;
Work.emplace_back(
WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }}));
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index f6469c51d..1a34019fb 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -618,7 +618,7 @@ GcManager::CollectGarbage(const GcSettings& Settings)
std::unordered_map<std::unique_ptr<GcStoreCompactor>, GcCompactStoreStats*> StoreCompactors;
RwLock StoreCompactorsLock;
- WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetSmallWorkerPool();
+ WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetMediumWorkerPool();
ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size());
if (!m_GcReferencers.empty())
@@ -2019,7 +2019,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time
Stopwatch Timer;
ZEN_INFO("scrubbing STARTING (delete mode => {}, skip CID => {})", DoDelete, SkipCid);
- WorkerThreadPool& ThreadPool = GetSmallWorkerPool();
+ WorkerThreadPool& ThreadPool = GetMediumWorkerPool();
ScrubContext Ctx{ThreadPool, Deadline};
try
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
index 339120ece..78a8e5c5e 100644
--- a/src/zenutil/include/zenutil/workerpools.h
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -6,10 +6,13 @@
namespace zen {
-// Worker pool with std::thread::hardware_concurrency() worker threads
+// Worker pool with std::thread::hardware_concurrency() worker threads, but at least one thread
WorkerThreadPool& GetLargeWorkerPool();
-// Worker pool with std::thread::hardware_concurrency() / 4 worker threads
+// Worker pool with std::thread::hardware_concurrency() / 4 worker threads, but at least one thread
+WorkerThreadPool& GetMediumWorkerPool();
+
+// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread
WorkerThreadPool& GetSmallWorkerPool();
// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
index 3ae302064..939f3a1c4 100644
--- a/src/zenutil/workerpools.cpp
+++ b/src/zenutil/workerpools.cpp
@@ -11,14 +11,16 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
namespace {
- const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency());
- const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 1u));
+ const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency());
+ const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 1u));
+ const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u));
static bool IsShutDown = false;
RwLock PoolLock;
std::unique_ptr<WorkerThreadPool> LargeWorkerPool;
+ std::unique_ptr<WorkerThreadPool> MediumWorkerPool;
std::unique_ptr<WorkerThreadPool> SmallWorkerPool;
std::unique_ptr<WorkerThreadPool> SyncWorkerPool;
} // namespace
@@ -44,6 +46,26 @@ GetLargeWorkerPool()
}
WorkerThreadPool&
+GetMediumWorkerPool()
+{
+ {
+ RwLock::SharedLockScope _(PoolLock);
+ if (MediumWorkerPool)
+ {
+ return *MediumWorkerPool;
+ }
+ }
+ RwLock::ExclusiveLockScope _(PoolLock);
+ ZEN_ASSERT(!IsShutDown);
+ if (MediumWorkerPool)
+ {
+ return *MediumWorkerPool;
+ }
+ MediumWorkerPool.reset(new WorkerThreadPool(MediumWorkerThreadPoolTreadCount, "MediumThreadPool"));
+ return *MediumWorkerPool;
+}
+
+WorkerThreadPool&
GetSmallWorkerPool()
{
{
@@ -89,6 +111,7 @@ ShutdownWorkerPools()
RwLock::ExclusiveLockScope _(PoolLock);
IsShutDown = true;
LargeWorkerPool.reset();
+ MediumWorkerPool.reset();
SmallWorkerPool.reset();
SyncWorkerPool.reset();
}