aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-02-12 09:02:35 +0100
committerGitHub Enterprise <[email protected]>2025-02-12 09:02:35 +0100
commitda9179d330a37132488f6deb8d8068783b087256 (patch)
tree3309dfe685495bab7f18068f7c0d1dbd76a4b536 /src/zenserver/projectstore/remoteprojectstore.cpp
parentimproved builds api interface in jupiter (#281) (diff)
downloadzen-da9179d330a37132488f6deb8d8068783b087256.tar.xz
zen-da9179d330a37132488f6deb8d8068783b087256.zip
moving and small refactor of chunk blocks to prepare for builds api (#282)
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp226
1 files changed, 77 insertions, 149 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 0589fdc5f..5b75a840e 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -12,8 +12,8 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
-#include <zenstore/chunkedfile.h>
#include <zenstore/cidstore.h>
+#include <zenutil/chunkedfile.h>
#include <zenutil/workerpools.h>
#include <unordered_map>
@@ -143,7 +143,7 @@ namespace remotestore_impl {
NiceBytes(Stats.m_PeakReceivedBytes));
}
- size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks)
+ size_t AddBlock(RwLock& BlocksLock, std::vector<ChunkBlockDescription>& Blocks)
{
size_t BlockIndex;
{
@@ -573,7 +573,7 @@ namespace remotestore_impl {
return;
}
- bool StoreChunksOK = IterateBlock(
+ bool StoreChunksOK = IterateChunkBlock(
BlockPayload,
[&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
const IoHash& AttachmentRawHash) {
@@ -738,14 +738,14 @@ namespace remotestore_impl {
});
};
- void CreateBlock(WorkerThreadPool& WorkerPool,
- Latch& OpSectionsLatch,
- std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
- RwLock& SectionsLock,
- std::vector<RemoteProjectStore::Block>& Blocks,
- size_t BlockIndex,
- const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
- AsyncRemoteResult& RemoteResult)
+ void CreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<ChunkBlockDescription>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
{
OpSectionsLatch.AddCount(1);
WorkerPool.ScheduleWork([&Blocks,
@@ -764,10 +764,10 @@ namespace remotestore_impl {
try
{
ZEN_ASSERT(ChunkCount > 0);
- Stopwatch Timer;
- RemoteProjectStore::Block Block;
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block);
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ Stopwatch Timer;
+ ChunkBlockDescription Block;
+ CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block);
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
{
// We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
RwLock::SharedLockScope __(SectionsLock);
@@ -800,8 +800,8 @@ namespace remotestore_impl {
struct CreatedBlock
{
- IoBuffer Payload;
- RemoteProjectStore::Block Block;
+ IoBuffer Payload;
+ ChunkBlockDescription Block;
};
void UploadAttachments(WorkerThreadPool& WorkerPool,
@@ -931,8 +931,8 @@ namespace remotestore_impl {
}
try
{
- IoBuffer Payload;
- RemoteProjectStore::Block Block;
+ IoBuffer Payload;
+ ChunkBlockDescription Block;
if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
{
Payload = BlockIt->second.Payload;
@@ -1058,7 +1058,7 @@ namespace remotestore_impl {
{
auto It = BulkBlockAttachmentsToUpload.find(Chunk);
ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompositeBuffer ChunkPayload = It->second(It->first);
+ CompressedBuffer ChunkPayload = It->second(It->first).second;
if (!ChunkPayload)
{
RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
@@ -1067,8 +1067,8 @@ namespace remotestore_impl {
ChunkBuffers.clear();
break;
}
- ChunksSize += ChunkPayload.GetSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer()));
+ ChunksSize += ChunkPayload.GetCompressedSize();
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer()));
}
RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
if (Result.ErrorCode)
@@ -1139,54 +1139,13 @@ namespace remotestore_impl {
}
} // namespace remotestore_impl
-bool
-IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
-{
- ZEN_ASSERT(BlockPayload);
- if (BlockPayload.GetSize() < 1)
- {
- return false;
- }
-
- MemoryView BlockView = BlockPayload.GetView();
- const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
- uint32_t NumberSize;
- uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize);
- ReadPtr += NumberSize;
- std::vector<uint64_t> ChunkSizes;
- ChunkSizes.reserve(ChunkCount);
- while (ChunkCount--)
- {
- ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize));
- ReadPtr += NumberSize;
- }
- ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr);
- ZEN_ASSERT(TempBufferLength > 0);
- for (uint64_t ChunkSize : ChunkSizes)
- {
- IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize);
- IoHash AttachmentRawHash;
- uint64_t AttachmentRawSize;
- CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize);
-
- if (!CompressedChunk)
- {
- ZEN_ERROR("Invalid chunk in block");
- return false;
- }
- Visitor(std::move(CompressedChunk), AttachmentRawHash);
- ReadPtr += ChunkSize;
- ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd());
- }
- return true;
-};
std::vector<IoHash>
GetBlockHashesFromOplog(CbObjectView ContainerObject)
{
using namespace std::literals;
- std::vector<RemoteProjectStore::Block> Result;
- CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ std::vector<ChunkBlockDescription> Result;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
std::vector<IoHash> BlockHashes;
BlockHashes.reserve(BlocksArray.Num());
@@ -1199,11 +1158,11 @@ GetBlockHashesFromOplog(CbObjectView ContainerObject)
return BlockHashes;
}
-std::vector<RemoteProjectStore::Block>
+std::vector<ChunkBlockDescription>
GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes)
{
using namespace std::literals;
- std::vector<RemoteProjectStore::Block> Result;
+ std::vector<ChunkBlockDescription> Result;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet;
IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end());
@@ -1232,47 +1191,6 @@ GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> Include
return Result;
}
-CompressedBuffer
-GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock)
-{
- const size_t ChunkCount = FetchChunks.size();
-
- std::vector<SharedBuffer> ChunkSegments;
- ChunkSegments.resize(1);
- ChunkSegments.reserve(1 + ChunkCount);
- OutBlock.ChunkHashes.reserve(ChunkCount);
- OutBlock.ChunkLengths.reserve(ChunkCount);
- {
- IoBuffer TempBuffer(ChunkCount * 9);
- MutableMemoryView View = TempBuffer.GetMutableView();
- uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData());
- uint8_t* BufferEndPtr = BufferStartPtr;
- BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
- for (const auto& It : FetchChunks)
- {
- CompositeBuffer Chunk = It.second(It.first);
- uint64_t ChunkSize = 0;
- std::span<const SharedBuffer> Segments = Chunk.GetSegments();
- for (const SharedBuffer& Segment : Segments)
- {
- ChunkSize += Segment.GetSize();
- ChunkSegments.push_back(Segment);
- }
- BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr);
- OutBlock.ChunkHashes.push_back(It.first);
- OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize));
- }
- ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
- ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
- ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
- }
- CompressedBuffer CompressedBlock =
- CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
- OutBlock.BlockHash = CompressedBlock.DecodeRawHash();
- OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize());
- return CompressedBlock;
-}
-
CbObject
BuildContainer(CidStore& ChunkStore,
ProjectStore::Project& Project,
@@ -1283,9 +1201,9 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::vector<RemoteProjectStore::Block>& KnownBlocks,
+ const std::vector<ChunkBlockDescription>& KnownBlocks,
WorkerThreadPool& WorkerPool,
- const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles,
@@ -1307,9 +1225,9 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
- RwLock BlocksLock;
- std::vector<RemoteProjectStore::Block> Blocks;
- CompressedBuffer OpsBuffer;
+ RwLock BlocksLock;
+ std::vector<ChunkBlockDescription> Blocks;
+ CompressedBuffer OpsBuffer;
std::filesystem::path AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
@@ -1525,7 +1443,7 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
- auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks,
+ auto FindReuseBlocks = [](const std::vector<ChunkBlockDescription>& KnownBlocks,
const std::unordered_set<IoHash, IoHash::Hasher>& Attachments,
JobContext* OptionalContext) -> std::vector<size_t> {
std::vector<size_t> ReuseBlockIndexes;
@@ -1538,8 +1456,8 @@ BuildContainer(CidStore& ChunkStore,
for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
{
- const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size();
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size();
if (BlockAttachmentCount == 0)
{
continue;
@@ -1586,7 +1504,7 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (UploadAttachments.erase(KnownHash) == 1)
@@ -1632,12 +1550,12 @@ BuildContainer(CidStore& ChunkStore,
return Chunked;
};
- RwLock ResolveLock;
- std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments;
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments;
- std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
+ RwLock ResolveLock;
+ std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments;
+ std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
@@ -1730,7 +1648,7 @@ BuildContainer(CidStore& ChunkStore,
}
else
{
- size_t RawSize = RawData.GetSize();
+ uint64_t RawSize = RawData.GetSize();
CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
OodleCompressor::Mermaid,
OodleCompressionLevel::VeryFast);
@@ -1753,8 +1671,8 @@ BuildContainer(CidStore& ChunkStore,
{
UploadAttachment->Size = Compressed.GetCompressedSize();
ResolveLock.WithExclusiveLock(
- [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
- LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data));
+ [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
+ LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
});
}
}
@@ -1927,7 +1845,7 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (ChunkedHashes.erase(KnownHash) == 1)
@@ -2109,16 +2027,25 @@ BuildContainer(CidStore& ChunkStore,
{
if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
{
- ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) {
- return CompositeBuffer(IoBuffer);
- }));
+ ChunksInBlock.emplace_back(std::make_pair(
+ RawHash,
+ [RawSize = It->second.first,
+ IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer()));
+ }));
LooseUploadAttachments.erase(It);
}
else
{
- ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) {
- return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash)));
- }));
+ ChunksInBlock.emplace_back(
+ std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> {
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash);
+ IoHash _;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), _, RawSize);
+ ZEN_ASSERT(Compressed);
+ return {RawSize, Compressed};
+ }));
}
BlockSize += PayloadSize;
@@ -2169,14 +2096,15 @@ BuildContainer(CidStore& ChunkStore,
if (BlockAttachmentHashes.insert(ChunkHash).second)
{
const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
- ChunksInBlock.emplace_back(std::make_pair(
- ChunkHash,
- [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) {
- return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::None)
- .GetCompressed();
- }));
+ ChunksInBlock.emplace_back(
+ std::make_pair(ChunkHash,
+ [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
+ const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return {Size,
+ CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None)};
+ }));
BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
if (BuildBlocks)
{
@@ -2298,7 +2226,7 @@ BuildContainer(CidStore& ChunkStore,
OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
OplogContinerWriter.BeginArray("blocks"sv);
{
- for (const RemoteProjectStore::Block& B : Blocks)
+ for (const ChunkBlockDescription& B : Blocks)
{
ZEN_ASSERT(!B.ChunkHashes.empty());
if (BuildBlocks)
@@ -2392,7 +2320,7 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
@@ -2458,8 +2386,8 @@ SaveOplog(CidStore& ChunkStore,
std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks;
tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
- auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
- RemoteProjectStore::Block&& Block) {
+ auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
+ ChunkBlockDescription&& Block) {
std::filesystem::path BlockPath = AttachmentTempPath;
BlockPath.append(Block.BlockHash.ToHexString());
try
@@ -2478,8 +2406,8 @@ SaveOplog(CidStore& ChunkStore,
}
};
- auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
- RemoteProjectStore::Block&& Block) {
+ auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
+ ChunkBlockDescription&& Block) {
IoHash BlockHash = Block.BlockHash;
RemoteProjectStore::SaveAttachmentResult Result =
RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block));
@@ -2512,7 +2440,7 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Found attachment {}", AttachmentHash);
};
- std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock;
+ std::function<void(CompressedBuffer&&, ChunkBlockDescription &&)> OnBlock;
if (RemoteStoreInfo.UseTempBlockFiles)
{
OnBlock = MakeTempBlock;
@@ -2522,7 +2450,7 @@ SaveOplog(CidStore& ChunkStore,
OnBlock = UploadBlock;
}
- std::vector<RemoteProjectStore::Block> KnownBlocks;
+ std::vector<ChunkBlockDescription> KnownBlocks;
uint64_t TransferWallTimeMS = 0;