diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-26 14:10:20 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-26 14:10:20 +0200 |
| commit | 8479e339cbf0b133f97a93b110d95fd8674916d3 (patch) | |
| tree | 6a51a13cfccc77becdb53df3de40475308e470a4 /src | |
| parent | 5.5.0 (diff) | |
| download | zen-8479e339cbf0b133f97a93b110d95fd8674916d3.tar.xz zen-8479e339cbf0b133f97a93b110d95fd8674916d3.zip | |
oplog iterate chunks content type (#65)
- Bugfix: Properly set content type of chunks fetch from CidStore
- Improvement: Add IterateChunks(std::span<Oid>) for better performance in get oplog
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 152 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 14 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 27 |
4 files changed, 144 insertions, 54 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index f2bf5b353..83038372e 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -1205,11 +1205,6 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) Op.IterateAttachments([&](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); - - // We force this for now as content type is not consistently tracked (will - // be fixed in CidStore refactor) - Payload.SetContentType(ZenContentType::kCompressedBinary); - if (Payload) { switch (Payload.GetContentType()) diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index b36c8caa0..1e53dfd94 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -880,10 +880,7 @@ ProjectStore::Oplog::ReplayLog() IoBuffer ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) { - IoBuffer Chunk = m_CidStore.FindChunkByCid(RawHash); - Chunk.SetContentType(ZenContentType::kCompressedBinary); - - return Chunk; + return m_CidStore.FindChunkByCid(RawHash); } bool @@ -894,6 +891,97 @@ ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool); } +bool +ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) +{ + std::vector<size_t> CidChunkIndexes; + std::vector<IoHash> CidChunkHashes; + std::vector<size_t> FileChunkIndexes; + std::vector<std::filesystem::path> FileChunkPaths; + { + RwLock::SharedLockScope OplogLock(m_OplogLock); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkIds.size(); ChunkIndex++) + { + const Oid& ChunkId = ChunkIds[ChunkIndex]; + if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) + { + CidChunkIndexes.push_back(ChunkIndex); + CidChunkHashes.push_back(ChunkIt->second); + } + else if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) + { + CidChunkIndexes.push_back(ChunkIndex); + CidChunkHashes.push_back(ChunkIt->second); + } + else if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) + { + FileChunkIndexes.push_back(ChunkIndex); + FileChunkPaths.emplace_back(m_OuterProject->RootDir / FileIt->second.ServerPath); + } + } + } + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); }, + OptionalWorkerPool); + + if (OptionalWorkerPool) + { + std::atomic_bool Result = true; + Latch WorkLatch(1); + + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) + { + if (Result.load() == false) + { + break; + } + OptionalWorkerPool->ScheduleWork([&]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (Result.load() == false) + { + return; + } + size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; + const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (Payload) + { + if (!AsyncCallback(FileChunkIndex, Payload)) + { + Result.store(false); + } + } + }); + } + + WorkLatch.CountDown(); + WorkLatch.Wait(); + + return Result.load(); + } + else + { + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) + { + size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; + const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (Payload) + { + bool Result = AsyncCallback(FileChunkIndex, Payload); + if (!Result) + { + return false; + } + } + } + } + return true; +} + IoBuffer ProjectStore::Oplog::FindChunk(const Oid& ChunkId) { @@ -908,10 +996,7 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId) IoHash ChunkHash = ChunkIt->second; OplogLock.ReleaseNow(); - IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); - Chunk.SetContentType(ZenContentType::kCompressedBinary); - - return Chunk; + return m_CidStore.FindChunkByCid(ChunkHash); } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) @@ -920,10 +1005,7 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId) OplogLock.ReleaseNow(); - IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath); - FileChunk.SetContentType(ZenContentType::kBinary); - - return FileChunk; + return IoBufferBuilder::MakeFromFile(FilePath); } if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) @@ -931,10 +1013,7 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId) IoHash ChunkHash = MetaIt->second; OplogLock.ReleaseNow(); - IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); - Chunk.SetContentType(ZenContentType::kCompressedBinary); - - return Chunk; + return m_CidStore.FindChunkByCid(ChunkHash); } return {}; @@ -2651,37 +2730,30 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, RawSizes.resize(Ids.size(), 0u); } - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool(); - Latch WorkLatch(1); - for (size_t Index = 0; Index < Ids.size(); Index++) { - WorkLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&WorkLatch, FoundLog, WantsSizeField, WantsRawSizeField, &Sizes, &RawSizes, Index, ChunkId = Ids[Index]]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (IoBuffer Chunk = FoundLog->FindChunk(ChunkId)) + FoundLog->IterateChunks( + Ids, + [&](size_t Index, const IoBuffer& Payload) { + uint64_t Size = Payload.GetSize(); + if (WantsRawSizeField) { - uint64_t Size = Chunk.GetSize(); - if (WantsRawSizeField) - { - uint64_t RawSize = Size; - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) - { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); - } - RawSizes[Index] = RawSize; - } - if (WantsSizeField) + uint64_t RawSize = Size; + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) { - Sizes[Index] = Size; + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); } + RawSizes[Index] = RawSize; } - }); + if (WantsSizeField) + { + Sizes[Index] = Size; + } + return true; + }, + &GetSmallWorkerPool()); } - WorkLatch.CountDown(); - WorkLatch.Wait(); } CbObjectWriter Response; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 9bae0382b..fd8443660 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -102,12 +102,14 @@ public: int GetOpIndexByKey(const Oid& Key); int GetMaxOpIndex() const; - IoBuffer FindChunk(const Oid& ChunkId); - IoBuffer GetChunkByRawHash(const IoHash& RawHash); - bool IterateChunks(std::span<IoHash> RawHashes, - const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); - + IoBuffer FindChunk(const Oid& ChunkId); + IoBuffer GetChunkByRawHash(const IoHash& RawHash); + bool IterateChunks(std::span<IoHash> RawHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool); + bool IterateChunks(std::span<Oid> ChunkIds, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool); inline static const uint32_t kInvalidOp = ~0u; /** Persist a new oplog entry diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 26ec2b10d..45d7dd277 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -365,16 +365,19 @@ CasImpl::FindChunk(const IoHash& ChunkHash) if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash)) { + Found.SetContentType(ZenContentType::kCompressedBinary); return Found; } if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash)) { + Found.SetContentType(ZenContentType::kCompressedBinary); return Found; } if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash)) { + Found.SetContentType(ZenContentType::kCompressedBinary); return Found; } @@ -405,15 +408,33 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, WorkerThreadPool* OptionalWorkerPool) { ZEN_TRACE_CPU("CAS::IterateChunks"); - if (!m_SmallStrategy.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool)) + if (!m_SmallStrategy.IterateChunks( + DecompressedIds, + [&](size_t Index, const IoBuffer& Payload) { + IoBuffer Chunk(Payload); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return AsyncCallback(Index, Payload); + }, + OptionalWorkerPool)) { return false; } - if (!m_TinyStrategy.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool)) + if (!m_TinyStrategy.IterateChunks( + DecompressedIds, + [&](size_t Index, const IoBuffer& Payload) { + IoBuffer Chunk(Payload); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return AsyncCallback(Index, Payload); + }, + OptionalWorkerPool)) { return false; } - if (!m_LargeStrategy.IterateChunks(DecompressedIds, AsyncCallback)) + if (!m_LargeStrategy.IterateChunks(DecompressedIds, [&](size_t Index, const IoBuffer& Payload) { + IoBuffer Chunk(Payload); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return AsyncCallback(Index, Payload); + })) { return false; } |