diff options
| author | Dan Engelbrecht <[email protected]> | 2024-03-28 14:56:20 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-03-28 14:56:20 +0100 |
| commit | 76ac4d541c603dd869e18cfbc6644ebf6c6e22d7 (patch) | |
| tree | f919560d5ddb5a33057f9337d930ca327149dafd /src | |
| parent | add "fieldnames" query param for GetProjectFiles/GetProjectChunkInfos (#29) (diff) | |
| download | zen-76ac4d541c603dd869e18cfbc6644ebf6c6e22d7.tar.xz zen-76ac4d541c603dd869e18cfbc6644ebf6c6e22d7.zip | |
Use multithreading to fetch size/rawsize of entries in `/prj/{project}/oplog/{log}/chunkinfos` and `/prj/{project}/oplog/{log}/files` (#30)
- Improvement: Use multithreading to fetch size/rawsize of entries in `/prj/{project}/oplog/{log}/chunkinfos` and `/prj/{project}/oplog/{log}/files`
- Improvement: Add `GetMediumWorkerPool()` in addition to `LargeWorkerPool()` and `SmallWorkerPool()`
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 219 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/workerpools.h | 7 | ||||
| -rw-r--r-- | src/zenutil/workerpools.cpp | 27 |
8 files changed, 198 insertions, 68 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index f01134fa5..dd390d08c 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -20,6 +20,7 @@ #include <zenstore/scrubcontext.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/packageformat.h> +#include <zenutil/workerpools.h> #include "fileremoteprojectstore.h" #include "jupiterremoteprojectstore.h" @@ -844,6 +845,15 @@ ProjectStore::Oplog::ReplayLog() } IoBuffer +ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) +{ + IoBuffer Chunk = m_CidStore.FindChunkByCid(RawHash); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; +} + +IoBuffer ProjectStore::Oplog::FindChunk(const Oid& ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); @@ -2532,49 +2542,101 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); - CbObjectWriter Response; - Response.BeginArray("files"sv); + std::vector<Oid> Ids; + std::vector<std::string> ServerPaths; + std::vector<std::string> ClientPaths; + std::vector<uint64_t> Sizes; + std::vector<uint64_t> RawSizes; + size_t Count = 0; FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { + if (WantsIdField || WantsRawSizeField || WantsSizeField) + { + Ids.push_back(Id); + } + if (WantsServerPathField) + { + ServerPaths.push_back(std::string(ServerPath)); + } + if (WantsClientPathField) + { + ClientPaths.push_back(std::string(ClientPath)); + } + Count++; + }); + if (WantsRawSizeField || WantsSizeField) + { + if (WantsSizeField) + { + Sizes.resize(Ids.size(), 0u); + } + if (WantsRawSizeField) + { + RawSizes.resize(Ids.size(), 0u); + } + + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool(); + Latch WorkLatch(1); + + for (size_t Index = 0; Index < Ids.size(); Index++) + { + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&WorkLatch, FoundLog, WantsSizeField, WantsRawSizeField, &Sizes, &RawSizes, Index, ChunkId = Ids[Index]]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (IoBuffer Chunk = FoundLog->FindChunk(ChunkId)) + { + uint64_t Size = Chunk.GetSize(); + if (WantsRawSizeField) + { + uint64_t RawSize = Size; + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); + } + RawSizes[Index] = RawSize; + } + if (WantsSizeField) + { + Sizes[Index] = Size; + } + } + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + } + + CbObjectWriter Response; + Response.BeginArray("files"sv); + for (size_t Index = 0; Index < Count; Index++) + { Response.BeginObject(); if (WantsIdField) { - Response << "id"sv << Id; + Response << "id"sv << Ids[Index]; + } + if (WantsServerPathField) + { + Response << "serverpath"sv << ServerPaths[Index]; } if (WantsClientPathField) { - Response << "clientpath"sv << ClientPath; + Response << "clientpath"sv << ClientPaths[Index]; } - if (WantsServerPathField && !ServerPath.empty()) + if (WantsSizeField) { - Response << "serverpath"sv << ServerPath; + Response << "size"sv << Sizes[Index]; } - if (WantsRawSizeField || WantsSizeField) + if (WantsRawSizeField) { - IoBuffer Chunk = FoundLog->FindChunk(Id); - if (WantsSizeField) - { - Response << "size"sv << Chunk.GetSize(); - } - if (WantsRawSizeField) - { - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) - { - IoHash _; - uint64_t RawSize = 0; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), _, RawSize); - Response << "rawsize"sv << RawSize; - } - else - { - Response << "rawsize"sv << Chunk.GetSize(); - } - } + Response << "rawsize"sv << RawSizes[Index]; } Response.EndObject(); - }); - + } Response.EndArray(); + OutPayload = Response.Save(); return {HttpResponseCode::OK, {}}; } @@ -2603,9 +2665,6 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, } Project->TouchOplog(OplogId); - std::vector<std::pair<Oid, IoHash>> ChunkInfos; - FoundLog->IterateChunkMap([&ChunkInfos](const Oid& Id, const IoHash& Hash) { ChunkInfos.push_back({Id, Hash}); }); - const bool WantsAllFields = WantedFieldNames.empty(); const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); @@ -2613,45 +2672,89 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); + std::vector<Oid> Ids; + std::vector<IoHash> Hashes; + std::vector<uint64_t> RawSizes; + std::vector<uint64_t> Sizes; + + size_t Count = 0; + FoundLog->IterateChunkMap([&](const Oid& Id, const IoHash& Hash) { + if (WantsIdField) + { + Ids.push_back(Id); + } + if (WantsRawHashField || WantsRawSizeField || WantsSizeField) + { + Hashes.push_back(Hash); + } + Count++; + }); + + if (WantsRawSizeField || WantsSizeField) + { + if (WantsRawSizeField) + { + RawSizes.resize(Hashes.size(), 0u); + } + if (WantsSizeField) + { + Sizes.resize(Hashes.size(), 0u); + } + + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool(); + Latch WorkLatch(1); + + for (size_t Index = 0; Index < Hashes.size(); Index++) + { + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&WorkLatch, FoundLog, WantsSizeField, WantsRawSizeField, &Sizes, &RawSizes, Index, RawHash = Hashes[Index]]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (IoBuffer Chunk = FoundLog->GetChunkByRawHash(RawHash)) + { + uint64_t Size = Chunk.GetSize(); + if (WantsRawSizeField) + { + uint64_t RawSize = Size; + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); + } + RawSizes[Index] = RawSize; + } + if (WantsSizeField) + { + Sizes[Index] = Size; + } + } + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + } + CbObjectWriter Response; Response.BeginArray("chunkinfos"sv); - for (const auto& ChunkInfo : ChunkInfos) + for (size_t Index = 0; Index < Count; Index++) { Response.BeginObject(); if (WantsIdField) { - Response << "id"sv << ChunkInfo.first; + Response << "id"sv << Ids[Index]; } if (WantsRawHashField) { - Response << "rawhash"sv << ChunkInfo.second; + Response << "rawhash"sv << Hashes[Index]; } - - if (WantsRawSizeField || WantsSizeField) + if (WantsSizeField) { - if (IoBuffer Chunk = FoundLog->FindChunk(ChunkInfo.first)) - { - uint64_t Size = Chunk.GetSize(); - if (WantsSizeField) - { - Response << "size"sv << Size; - } - if (WantsRawSizeField) - { - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) - { - IoHash _; - uint64_t RawSize = 0; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), _, RawSize); - Response << "rawsize"sv << RawSize; - } - else - { - Response << "rawsize"sv << Size; - } - } - } + Response << "size"sv << Sizes[Index]; + } + if (WantsRawSizeField) + { + Response << "rawsize"sv << RawSizes[Index]; } Response.EndObject(); } diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index e07853ca8..e27bf6e49 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -102,6 +102,7 @@ public: int GetMaxOpIndex() const; IoBuffer FindChunk(const Oid& ChunkId); + IoBuffer GetChunkByRawHash(const IoHash& RawHash); inline static const uint32_t kInvalidOp = ~0u; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 3887272bf..ae4777278 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -1711,7 +1711,7 @@ SaveOplog(CidStore& ChunkStore, UploadInfo Info; WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); - WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(); + WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(); const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); @@ -2239,7 +2239,7 @@ LoadOplog(CidStore& ChunkStore, Stopwatch Timer; WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); - WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(); + WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(); std::unordered_set<IoHash, IoHash::Hasher> Attachments; diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 93c841e46..d897e26ce 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -3620,7 +3620,7 @@ ZenCacheDiskLayer::Flush() } } { - WorkerThreadPool& Pool = GetSmallWorkerPool(); + WorkerThreadPool& Pool = GetMediumWorkerPool(); Latch WorkLatch(1); try { diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 4f137744b..a1a4a0acc 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -107,7 +107,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig) // Initialize payload storage { - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); + WorkerThreadPool& WorkerPool = GetMediumWorkerPool(); std::vector<std::future<void>> Work; Work.emplace_back( WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }})); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index f6469c51d..1a34019fb 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -618,7 +618,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) std::unordered_map<std::unique_ptr<GcStoreCompactor>, GcCompactStoreStats*> StoreCompactors; RwLock StoreCompactorsLock; - WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetSmallWorkerPool(); + WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetMediumWorkerPool(); ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size()); if (!m_GcReferencers.empty()) @@ -2019,7 +2019,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time Stopwatch Timer; ZEN_INFO("scrubbing STARTING (delete mode => {}, skip CID => {})", DoDelete, SkipCid); - WorkerThreadPool& ThreadPool = GetSmallWorkerPool(); + WorkerThreadPool& ThreadPool = GetMediumWorkerPool(); ScrubContext Ctx{ThreadPool, Deadline}; try diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h index 339120ece..78a8e5c5e 100644 --- a/src/zenutil/include/zenutil/workerpools.h +++ b/src/zenutil/include/zenutil/workerpools.h @@ -6,10 +6,13 @@ namespace zen { -// Worker pool with std::thread::hardware_concurrency() worker threads +// Worker pool with std::thread::hardware_concurrency() worker threads, but at least one thread WorkerThreadPool& GetLargeWorkerPool(); -// Worker pool with std::thread::hardware_concurrency() / 4 worker threads +// Worker pool with std::thread::hardware_concurrency() / 4 worker threads, but at least one thread +WorkerThreadPool& GetMediumWorkerPool(); + +// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread WorkerThreadPool& GetSmallWorkerPool(); // Special worker pool that does not use worker thread but issues all scheduled work on the calling thread diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp index 3ae302064..939f3a1c4 100644 --- a/src/zenutil/workerpools.cpp +++ b/src/zenutil/workerpools.cpp @@ -11,14 +11,16 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace { - const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency()); - const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 1u)); + const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency()); + const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 1u)); + const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u)); static bool IsShutDown = false; RwLock PoolLock; std::unique_ptr<WorkerThreadPool> LargeWorkerPool; + std::unique_ptr<WorkerThreadPool> MediumWorkerPool; std::unique_ptr<WorkerThreadPool> SmallWorkerPool; std::unique_ptr<WorkerThreadPool> SyncWorkerPool; } // namespace @@ -44,6 +46,26 @@ GetLargeWorkerPool() } WorkerThreadPool& +GetMediumWorkerPool() +{ + { + RwLock::SharedLockScope _(PoolLock); + if (MediumWorkerPool) + { + return *MediumWorkerPool; + } + } + RwLock::ExclusiveLockScope _(PoolLock); + ZEN_ASSERT(!IsShutDown); + if (MediumWorkerPool) + { + return *MediumWorkerPool; + } + MediumWorkerPool.reset(new WorkerThreadPool(MediumWorkerThreadPoolTreadCount, "MediumThreadPool")); + return *MediumWorkerPool; +} + +WorkerThreadPool& GetSmallWorkerPool() { { @@ -89,6 +111,7 @@ ShutdownWorkerPools() RwLock::ExclusiveLockScope _(PoolLock); IsShutDown = true; LargeWorkerPool.reset(); + MediumWorkerPool.reset(); SmallWorkerPool.reset(); SyncWorkerPool.reset(); } |