aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-26 14:10:20 +0200
committerGitHub Enterprise <[email protected]>2024-04-26 14:10:20 +0200
commit8479e339cbf0b133f97a93b110d95fd8674916d3 (patch)
tree6a51a13cfccc77becdb53df3de40475308e470a4 /src
parent5.5.0 (diff)
downloadzen-8479e339cbf0b133f97a93b110d95fd8674916d3.tar.xz
zen-8479e339cbf0b133f97a93b110d95fd8674916d3.zip
oplog iterate chunks content type (#65)
- Bugfix: Properly set content type of chunks fetch from CidStore - Improvement: Add IterateChunks(std::span<Oid>) for better performance in get oplog
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp5
-rw-r--r--src/zenserver/projectstore/projectstore.cpp152
-rw-r--r--src/zenserver/projectstore/projectstore.h14
-rw-r--r--src/zenstore/cas.cpp27
4 files changed, 144 insertions, 54 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index f2bf5b353..83038372e 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -1205,11 +1205,6 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req)
Op.IterateAttachments([&](CbFieldView FieldView) {
const IoHash AttachmentHash = FieldView.AsAttachment();
IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash);
-
- // We force this for now as content type is not consistently tracked (will
- // be fixed in CidStore refactor)
- Payload.SetContentType(ZenContentType::kCompressedBinary);
-
if (Payload)
{
switch (Payload.GetContentType())
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index b36c8caa0..1e53dfd94 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -880,10 +880,7 @@ ProjectStore::Oplog::ReplayLog()
IoBuffer
ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash)
{
- IoBuffer Chunk = m_CidStore.FindChunkByCid(RawHash);
- Chunk.SetContentType(ZenContentType::kCompressedBinary);
-
- return Chunk;
+ return m_CidStore.FindChunkByCid(RawHash);
}
bool
@@ -894,6 +891,97 @@ ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes,
return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool);
}
+bool
+ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
+{
+ std::vector<size_t> CidChunkIndexes;
+ std::vector<IoHash> CidChunkHashes;
+ std::vector<size_t> FileChunkIndexes;
+ std::vector<std::filesystem::path> FileChunkPaths;
+ {
+ RwLock::SharedLockScope OplogLock(m_OplogLock);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkIds.size(); ChunkIndex++)
+ {
+ const Oid& ChunkId = ChunkIds[ChunkIndex];
+ if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end())
+ {
+ CidChunkIndexes.push_back(ChunkIndex);
+ CidChunkHashes.push_back(ChunkIt->second);
+ }
+ else if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end())
+ {
+ CidChunkIndexes.push_back(ChunkIndex);
+ CidChunkHashes.push_back(ChunkIt->second);
+ }
+ else if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
+ {
+ FileChunkIndexes.push_back(ChunkIndex);
+ FileChunkPaths.emplace_back(m_OuterProject->RootDir / FileIt->second.ServerPath);
+ }
+ }
+ }
+ m_CidStore.IterateChunks(
+ CidChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); },
+ OptionalWorkerPool);
+
+ if (OptionalWorkerPool)
+ {
+ std::atomic_bool Result = true;
+ Latch WorkLatch(1);
+
+ for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
+ {
+ if (Result.load() == false)
+ {
+ break;
+ }
+ OptionalWorkerPool->ScheduleWork([&]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (Result.load() == false)
+ {
+ return;
+ }
+ size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
+ const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
+ if (Payload)
+ {
+ if (!AsyncCallback(FileChunkIndex, Payload))
+ {
+ Result.store(false);
+ }
+ }
+ });
+ }
+
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+
+ return Result.load();
+ }
+ else
+ {
+ for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
+ {
+ size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
+ const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
+ if (Payload)
+ {
+ bool Result = AsyncCallback(FileChunkIndex, Payload);
+ if (!Result)
+ {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+}
+
IoBuffer
ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
{
@@ -908,10 +996,7 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
IoHash ChunkHash = ChunkIt->second;
OplogLock.ReleaseNow();
- IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
- Chunk.SetContentType(ZenContentType::kCompressedBinary);
-
- return Chunk;
+ return m_CidStore.FindChunkByCid(ChunkHash);
}
if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
@@ -920,10 +1005,7 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
OplogLock.ReleaseNow();
- IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath);
- FileChunk.SetContentType(ZenContentType::kBinary);
-
- return FileChunk;
+ return IoBufferBuilder::MakeFromFile(FilePath);
}
if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end())
@@ -931,10 +1013,7 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
IoHash ChunkHash = MetaIt->second;
OplogLock.ReleaseNow();
- IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
- Chunk.SetContentType(ZenContentType::kCompressedBinary);
-
- return Chunk;
+ return m_CidStore.FindChunkByCid(ChunkHash);
}
return {};
@@ -2651,37 +2730,30 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
RawSizes.resize(Ids.size(), 0u);
}
- WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool();
- Latch WorkLatch(1);
-
for (size_t Index = 0; Index < Ids.size(); Index++)
{
- WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&WorkLatch, FoundLog, WantsSizeField, WantsRawSizeField, &Sizes, &RawSizes, Index, ChunkId = Ids[Index]]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (IoBuffer Chunk = FoundLog->FindChunk(ChunkId))
+ FoundLog->IterateChunks(
+ Ids,
+ [&](size_t Index, const IoBuffer& Payload) {
+ uint64_t Size = Payload.GetSize();
+ if (WantsRawSizeField)
{
- uint64_t Size = Chunk.GetSize();
- if (WantsRawSizeField)
- {
- uint64_t RawSize = Size;
- if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
- {
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
- }
- RawSizes[Index] = RawSize;
- }
- if (WantsSizeField)
+ uint64_t RawSize = Size;
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
{
- Sizes[Index] = Size;
+ IoHash __;
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize);
}
+ RawSizes[Index] = RawSize;
}
- });
+ if (WantsSizeField)
+ {
+ Sizes[Index] = Size;
+ }
+ return true;
+ },
+ &GetSmallWorkerPool());
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
}
CbObjectWriter Response;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 9bae0382b..fd8443660 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -102,12 +102,14 @@ public:
int GetOpIndexByKey(const Oid& Key);
int GetMaxOpIndex() const;
- IoBuffer FindChunk(const Oid& ChunkId);
- IoBuffer GetChunkByRawHash(const IoHash& RawHash);
- bool IterateChunks(std::span<IoHash> RawHashes,
- const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
-
+ IoBuffer FindChunk(const Oid& ChunkId);
+ IoBuffer GetChunkByRawHash(const IoHash& RawHash);
+ bool IterateChunks(std::span<IoHash> RawHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool);
+ bool IterateChunks(std::span<Oid> ChunkIds,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool);
inline static const uint32_t kInvalidOp = ~0u;
/** Persist a new oplog entry
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 26ec2b10d..45d7dd277 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -365,16 +365,19 @@ CasImpl::FindChunk(const IoHash& ChunkHash)
if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash))
{
+ Found.SetContentType(ZenContentType::kCompressedBinary);
return Found;
}
if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash))
{
+ Found.SetContentType(ZenContentType::kCompressedBinary);
return Found;
}
if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash))
{
+ Found.SetContentType(ZenContentType::kCompressedBinary);
return Found;
}
@@ -405,15 +408,33 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
WorkerThreadPool* OptionalWorkerPool)
{
ZEN_TRACE_CPU("CAS::IterateChunks");
- if (!m_SmallStrategy.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool))
+ if (!m_SmallStrategy.IterateChunks(
+ DecompressedIds,
+ [&](size_t Index, const IoBuffer& Payload) {
+ IoBuffer Chunk(Payload);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+ return AsyncCallback(Index, Payload);
+ },
+ OptionalWorkerPool))
{
return false;
}
- if (!m_TinyStrategy.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool))
+ if (!m_TinyStrategy.IterateChunks(
+ DecompressedIds,
+ [&](size_t Index, const IoBuffer& Payload) {
+ IoBuffer Chunk(Payload);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+ return AsyncCallback(Index, Payload);
+ },
+ OptionalWorkerPool))
{
return false;
}
- if (!m_LargeStrategy.IterateChunks(DecompressedIds, AsyncCallback))
+ if (!m_LargeStrategy.IterateChunks(DecompressedIds, [&](size_t Index, const IoBuffer& Payload) {
+ IoBuffer Chunk(Payload);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+ return AsyncCallback(Index, Payload);
+ }))
{
return false;
}