diff options
| author | Dan Engelbrecht <[email protected]> | 2024-12-12 08:27:54 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-12-12 08:27:54 +0100 |
| commit | 9bb2bf10a76127fea1db01fab42c795bdc07c936 (patch) | |
| tree | 4bdb9d40ee265798afe4ec439dea45b7a7c5ed3c /src/zenserver/projectstore/remoteprojectstore.cpp | |
| parent | Memory tracking improvements (#262) (diff) | |
| download | zen-9bb2bf10a76127fea1db01fab42c795bdc07c936.tar.xz zen-9bb2bf10a76127fea1db01fab42c795bdc07c936.zip | |
Builds API remote project store (#258)
Feature: zen command oplog-export and oplog-import now supports --builds remote target using the Jupiter builds API
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 323 |
1 files changed, 174 insertions, 149 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 49403d39c..216b1c4dd 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -143,13 +143,7 @@ namespace remotestore_impl { NiceBytes(Stats.m_PeakReceivedBytes)); } - struct Block - { - IoHash BlockHash; - std::vector<IoHash> ChunksInBlock; - }; - - size_t AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) + size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks) { size_t BlockIndex; { @@ -741,14 +735,14 @@ namespace remotestore_impl { }); }; - void CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<Block>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) + void CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<RemoteProjectStore::Block>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork([&Blocks, @@ -767,16 +761,17 @@ namespace remotestore_impl { try { ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); + Stopwatch Timer; + RemoteProjectStore::Block Block; + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex].BlockHash = BlockHash; + Blocks[BlockIndex] = Block; } uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), BlockHash); + AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); ZEN_INFO("Generated block with {} attachments in {} ({})", ChunkCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), @@ -800,12 +795,18 @@ namespace remotestore_impl { std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0; }; + struct CreatedBlock + { + IoBuffer Payload; + RemoteProjectStore::Block Block; + }; + void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, RemoteProjectStore& RemoteStore, const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks, - const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks, + const std::unordered_map<IoHash, CreatedBlock, IoHash::Hasher>& CreatedBlocks, const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments, const std::unordered_set<IoHash, IoHash::Hasher>& Needs, bool ForceAll, @@ -927,12 +928,12 @@ namespace remotestore_impl { } try { - bool IsBlock = false; - IoBuffer Payload; + IoBuffer Payload; + RemoteProjectStore::Block Block; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { - Payload = BlockIt->second; - IsBlock = true; + Payload = BlockIt->second.Payload; + Block = BlockIt->second.Block; } else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) { @@ -953,9 +954,10 @@ namespace remotestore_impl { RemoteResult.GetErrorReason()); return; } + const bool IsBlock = Block.BlockHash == RawHash; size_t PayloadSize = Payload.GetSize(); RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash); + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block)); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); @@ -1176,13 +1178,67 @@ IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuff return true; }; +std::vector<IoHash> +GetBlockHashesFromOplog(CbObjectView ContainerObject) +{ + using namespace std::literals; + std::vector<RemoteProjectStore::Block> Result; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + + std::vector<IoHash> BlockHashes; + BlockHashes.reserve(BlocksArray.Num()); + for (CbFieldView BlockField : BlocksArray) + { + CbObjectView BlockView = BlockField.AsObjectView(); + IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); + BlockHashes.push_back(BlockHash); + } + return BlockHashes; +} + +std::vector<RemoteProjectStore::Block> +GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes) +{ + using namespace std::literals; + std::vector<RemoteProjectStore::Block> Result; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet; + IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end()); + + Result.reserve(IncludeBlockHashes.size()); + for (CbFieldView BlockField : BlocksArray) + { + CbObjectView BlockView = BlockField.AsObjectView(); + IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); + if (IncludeSet.contains(BlockHash)) + { + std::vector<IoHash> ChunkHashes; + CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); + if (BlockHash == IoHash::Zero) + { + continue; + } + ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) + { + ChunkHashes.push_back(ChunkField.AsHash()); + } + Result.push_back({.BlockHash = BlockHash, .ChunkHashes = std::move(ChunkHashes)}); + } + } + return Result; +} + CompressedBuffer -GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks) +GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock) { + const size_t ChunkCount = FetchChunks.size(); + std::vector<SharedBuffer> ChunkSegments; ChunkSegments.resize(1); - ChunkSegments.reserve(1 + FetchChunks.size()); - size_t ChunkCount = FetchChunks.size(); + ChunkSegments.reserve(1 + ChunkCount); + OutBlock.ChunkHashes.reserve(ChunkCount); + OutBlock.ChunkLengths.reserve(ChunkCount); { IoBuffer TempBuffer(ChunkCount * 9); MutableMemoryView View = TempBuffer.GetMutableView(); @@ -1200,6 +1256,8 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks) ChunkSegments.push_back(Segment); } BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); + OutBlock.ChunkHashes.push_back(It.first); + OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize)); } ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); @@ -1207,7 +1265,8 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks) } CompressedBuffer CompressedBlock = CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); - + OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); + OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize()); return CompressedBlock; } @@ -1221,9 +1280,9 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::vector<remotestore_impl::Block>& KnownBlocks, + const std::vector<RemoteProjectStore::Block>& KnownBlocks, WorkerThreadPool& WorkerPool, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles, @@ -1245,9 +1304,9 @@ BuildContainer(CidStore& ChunkStore, std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - RwLock BlocksLock; - std::vector<remotestore_impl::Block> Blocks; - CompressedBuffer OpsBuffer; + RwLock BlocksLock; + std::vector<RemoteProjectStore::Block> Blocks; + CompressedBuffer OpsBuffer; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); @@ -1463,7 +1522,7 @@ BuildContainer(CidStore& ChunkStore, return {}; } - auto FindReuseBlocks = [](const std::vector<remotestore_impl::Block>& KnownBlocks, + auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks, const std::unordered_set<IoHash, IoHash::Hasher>& Attachments, JobContext* OptionalContext) -> std::vector<size_t> { std::vector<size_t> ReuseBlockIndexes; @@ -1476,14 +1535,14 @@ BuildContainer(CidStore& ChunkStore, for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { - const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); + const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size(); if (BlockAttachmentCount == 0) { continue; } size_t FoundAttachmentCount = 0; - for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) + for (const IoHash& KnownHash : KnownBlock.ChunkHashes) { if (Attachments.contains(KnownHash)) { @@ -1524,8 +1583,8 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) + const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkHashes) { if (UploadAttachments.erase(KnownHash) == 1) { @@ -1865,8 +1924,8 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) + const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkHashes) { if (ChunkedHashes.erase(KnownHash) == 1) { @@ -1978,7 +2037,7 @@ BuildContainer(CidStore& ChunkStore, uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs(); std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; auto NewBlock = [&]() { - size_t BlockIndex = AddBlock(BlocksLock, Blocks); + size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); size_t ChunkCount = ChunksInBlock.size(); if (BuildBlocks) { @@ -2000,9 +2059,9 @@ BuildContainer(CidStore& ChunkStore, { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); + Blocks[BlockIndex].ChunkHashes.insert(Blocks[BlockIndex].ChunkHashes.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); } uint64_t NowMS = Timer.GetElapsedTimeMs(); ZEN_INFO("Assembled block {} with {} chunks in {} ({})", @@ -2234,12 +2293,11 @@ BuildContainer(CidStore& ChunkStore, CbObjectWriter OplogContinerWriter; RwLock::SharedLockScope _(BlocksLock); OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); - OplogContinerWriter.BeginArray("blocks"sv); { - for (const remotestore_impl::Block& B : Blocks) + for (const RemoteProjectStore::Block& B : Blocks) { - ZEN_ASSERT(!B.ChunksInBlock.empty()); + ZEN_ASSERT(!B.ChunkHashes.empty()); if (BuildBlocks) { ZEN_ASSERT(B.BlockHash != IoHash::Zero); @@ -2249,7 +2307,7 @@ BuildContainer(CidStore& ChunkStore, OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContinerWriter.BeginArray("chunks"sv); { - for (const IoHash& RawHash : B.ChunksInBlock) + for (const IoHash& RawHash : B.ChunkHashes) { OplogContinerWriter.AddHash(RawHash); } @@ -2265,7 +2323,7 @@ BuildContainer(CidStore& ChunkStore, { OplogContinerWriter.BeginArray("chunks"sv); { - for (const IoHash& RawHash : B.ChunksInBlock) + for (const IoHash& RawHash : B.ChunkHashes) { OplogContinerWriter.AddBinaryAttachment(RawHash); } @@ -2331,7 +2389,7 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) @@ -2391,21 +2449,21 @@ SaveOplog(CidStore& ChunkStore, CreateDirectories(AttachmentTempPath); } - remotestore_impl::AsyncRemoteResult RemoteResult; - RwLock AttachmentsLock; - std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; - tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; + remotestore_impl::AsyncRemoteResult RemoteResult; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; + std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks; + tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; - auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - const IoHash& BlockHash) { + auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + RemoteProjectStore::Block&& Block) { std::filesystem::path BlockPath = AttachmentTempPath; - BlockPath.append(BlockHash.ToHexString()); + BlockPath.append(Block.BlockHash.ToHexString()); try { IoBuffer BlockBuffer = remotestore_impl::WriteToTempFile(std::move(CompressedBlock), BlockPath); RwLock::ExclusiveLockScope __(AttachmentsLock); - CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); + CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); } catch (const std::exception& Ex) @@ -2417,8 +2475,11 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { - RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); + auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, + RemoteProjectStore::Block&& Block) { + IoHash BlockHash = Block.BlockHash; + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block)); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); @@ -2448,7 +2509,7 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Found attachment {}", AttachmentHash); }; - std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock; + std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock; if (RemoteStoreInfo.UseTempBlockFiles) { OnBlock = MakeTempBlock; @@ -2458,95 +2519,52 @@ SaveOplog(CidStore& ChunkStore, OnBlock = UploadBlock; } - std::vector<remotestore_impl::Block> KnownBlocks; + std::vector<RemoteProjectStore::Block> KnownBlocks; uint64_t TransferWallTimeMS = 0; - if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty()) + RemoteProjectStore::CreateContainerResult ContainerResult = RemoteStore.CreateContainer(); + if (ContainerResult.ErrorCode) { + RemoteProjectStore::Result Result = {.ErrorCode = ContainerResult.ErrorCode, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Text = fmt::format("Failed to create container for oplog '{}' ({}): {}", + RemoteStoreInfo.ContainerName, + ContainerResult.ErrorCode, + ContainerResult.Reason)}; remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName)); - Stopwatch LoadBaseContainerTimer; - RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); - TransferWallTimeMS += LoadBaseContainerTimer.GetElapsedTimeMs(); - - if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent)) - { - if (BaseContainerResult.ErrorCode) - { - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments", - RemoteStoreInfo.BaseContainerName, - BaseContainerResult.ErrorCode, - BaseContainerResult.Reason)); - } - else - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Loaded oplog base container in {}", - NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0)))); - - CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView(); - - std::vector<IoHash> BlockHashes; - BlockHashes.reserve(BlocksArray.Num()); - for (CbFieldView BlockField : BlocksArray) - { - CbObjectView BlockView = BlockField.AsObjectView(); - IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); - BlockHashes.push_back(BlockHash); - } - - RemoteProjectStore::HasAttachmentsResult HasResult = RemoteStore.HasAttachments(BlockHashes); - if (HasResult.ErrorCode == 0) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}", - BlockHashes.size(), - BlockHashes.size() > 1 ? "s"sv : ""sv, - BlockHashes.size() - HasResult.Needs.size(), - NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0)))); - if (HasResult.Needs.size() < BlocksArray.Num()) - { - KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size()); - - const std::unordered_set<IoHash, IoHash::Hasher> MissingBlocks(HasResult.Needs); + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return Result; + } - for (CbFieldView BlockField : BlocksArray) - { - CbObjectView BlockView = BlockField.AsObjectView(); - IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); - if (!MissingBlocks.contains(BlockHash)) - { - std::vector<IoHash> ChunksInBlock; - CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); - if (BlockHash == IoHash::Zero) - { - continue; - } + if (RemoteStoreInfo.CreateBlocks) + { + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching known blocks from '{}'", RemoteStoreInfo.Description)); + Stopwatch GetKnownBlocksTimer; + RemoteProjectStore::GetKnownBlocksResult KnownBlocksResult = RemoteStore.GetKnownBlocks(); + TransferWallTimeMS += GetKnownBlocksTimer.GetElapsedTimeMs(); - ChunksInBlock.reserve(ChunksArray.Num()); - for (CbFieldView ChunkField : ChunksArray) - { - ChunksInBlock.push_back(ChunkField.AsHash()); - } - KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)}); - } - } - } - } - else - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none " - "does: '{}', error code : {}", - HasResult.Reason, - HasResult.ErrorCode)); - } - } + if (KnownBlocksResult.ErrorCode == static_cast<int>(HttpResponseCode::NoContent)) + { + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("No known blocks in '{}', uploading all attachments", RemoteStoreInfo.Description)); + } + else if (KnownBlocksResult.ErrorCode) + { + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Failed to get known blocks from '{}' ({}): {}, uploading all attachments", + RemoteStoreInfo.Description, + KnownBlocksResult.ErrorCode, + KnownBlocksResult.Reason)); + } + else + { + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Fetched {} known blocks from '{}' in {}", + KnownBlocksResult.Blocks.size(), + RemoteStoreInfo.Description, + NiceTimeSpanMs(static_cast<uint64_t>(KnownBlocksResult.ElapsedSeconds * 1000.0)))); + KnownBlocks = std::move(KnownBlocksResult.Blocks); } } @@ -2588,15 +2606,22 @@ SaveOplog(CidStore& ChunkStore, RemoteStoreInfo.ContainerName, ChunkCount, BlockCount)); - Stopwatch SaveContainerTimer; - RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); + Stopwatch SaveContainerTimer; + IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer(); + ContainerPayload.SetContentType(ZenContentType::kCbObject); + RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload)); TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs(); if (ContainerSaveResult.ErrorCode) { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); + RemoteProjectStore::Result Result = { + .ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Text = fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())}; remotestore_impl::ReportMessage( OptionalContext, fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return Result; } else { |