aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-12-12 08:27:54 +0100
committerGitHub Enterprise <[email protected]>2024-12-12 08:27:54 +0100
commit9bb2bf10a76127fea1db01fab42c795bdc07c936 (patch)
tree4bdb9d40ee265798afe4ec439dea45b7a7c5ed3c /src/zenserver/projectstore/remoteprojectstore.cpp
parentMemory tracking improvements (#262) (diff)
downloadzen-9bb2bf10a76127fea1db01fab42c795bdc07c936.tar.xz
zen-9bb2bf10a76127fea1db01fab42c795bdc07c936.zip
Builds API remote project store (#258)
Feature: zen command oplog-export and oplog-import now supports --builds remote target using the Jupiter builds API
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp323
1 files changed, 174 insertions, 149 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 49403d39c..216b1c4dd 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -143,13 +143,7 @@ namespace remotestore_impl {
NiceBytes(Stats.m_PeakReceivedBytes));
}
- struct Block
- {
- IoHash BlockHash;
- std::vector<IoHash> ChunksInBlock;
- };
-
- size_t AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
+ size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks)
{
size_t BlockIndex;
{
@@ -741,14 +735,14 @@ namespace remotestore_impl {
});
};
- void CreateBlock(WorkerThreadPool& WorkerPool,
- Latch& OpSectionsLatch,
- std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
- RwLock& SectionsLock,
- std::vector<Block>& Blocks,
- size_t BlockIndex,
- const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
- AsyncRemoteResult& RemoteResult)
+ void CreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<RemoteProjectStore::Block>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
{
OpSectionsLatch.AddCount(1);
WorkerPool.ScheduleWork([&Blocks,
@@ -767,16 +761,17 @@ namespace remotestore_impl {
try
{
ZEN_ASSERT(ChunkCount > 0);
- Stopwatch Timer;
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks));
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ Stopwatch Timer;
+ RemoteProjectStore::Block Block;
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block);
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
{
// We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex].BlockHash = BlockHash;
+ Blocks[BlockIndex] = Block;
}
uint64_t BlockSize = CompressedBlock.GetCompressedSize();
- AsyncOnBlock(std::move(CompressedBlock), BlockHash);
+ AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
ZEN_INFO("Generated block with {} attachments in {} ({})",
ChunkCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
@@ -800,12 +795,18 @@ namespace remotestore_impl {
std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0;
};
+ struct CreatedBlock
+ {
+ IoBuffer Payload;
+ RemoteProjectStore::Block Block;
+ };
+
void UploadAttachments(WorkerThreadPool& WorkerPool,
CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks,
- const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
+ const std::unordered_map<IoHash, CreatedBlock, IoHash::Hasher>& CreatedBlocks,
const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
bool ForceAll,
@@ -927,12 +928,12 @@ namespace remotestore_impl {
}
try
{
- bool IsBlock = false;
- IoBuffer Payload;
+ IoBuffer Payload;
+ RemoteProjectStore::Block Block;
if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
{
- Payload = BlockIt->second;
- IsBlock = true;
+ Payload = BlockIt->second.Payload;
+ Block = BlockIt->second.Block;
}
else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
{
@@ -953,9 +954,10 @@ namespace remotestore_impl {
RemoteResult.GetErrorReason());
return;
}
+ const bool IsBlock = Block.BlockHash == RawHash;
size_t PayloadSize = Payload.GetSize();
RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block));
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
@@ -1176,13 +1178,67 @@ IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuff
return true;
};
+std::vector<IoHash>
+GetBlockHashesFromOplog(CbObjectView ContainerObject)
+{
+ using namespace std::literals;
+ std::vector<RemoteProjectStore::Block> Result;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+
+ std::vector<IoHash> BlockHashes;
+ BlockHashes.reserve(BlocksArray.Num());
+ for (CbFieldView BlockField : BlocksArray)
+ {
+ CbObjectView BlockView = BlockField.AsObjectView();
+ IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
+ BlockHashes.push_back(BlockHash);
+ }
+ return BlockHashes;
+}
+
+std::vector<RemoteProjectStore::Block>
+GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes)
+{
+ using namespace std::literals;
+ std::vector<RemoteProjectStore::Block> Result;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet;
+ IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end());
+
+ Result.reserve(IncludeBlockHashes.size());
+ for (CbFieldView BlockField : BlocksArray)
+ {
+ CbObjectView BlockView = BlockField.AsObjectView();
+ IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
+ if (IncludeSet.contains(BlockHash))
+ {
+ std::vector<IoHash> ChunkHashes;
+ CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
+ if (BlockHash == IoHash::Zero)
+ {
+ continue;
+ }
+ ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ ChunkHashes.push_back(ChunkField.AsHash());
+ }
+ Result.push_back({.BlockHash = BlockHash, .ChunkHashes = std::move(ChunkHashes)});
+ }
+ }
+ return Result;
+}
+
CompressedBuffer
-GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
+GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock)
{
+ const size_t ChunkCount = FetchChunks.size();
+
std::vector<SharedBuffer> ChunkSegments;
ChunkSegments.resize(1);
- ChunkSegments.reserve(1 + FetchChunks.size());
- size_t ChunkCount = FetchChunks.size();
+ ChunkSegments.reserve(1 + ChunkCount);
+ OutBlock.ChunkHashes.reserve(ChunkCount);
+ OutBlock.ChunkLengths.reserve(ChunkCount);
{
IoBuffer TempBuffer(ChunkCount * 9);
MutableMemoryView View = TempBuffer.GetMutableView();
@@ -1200,6 +1256,8 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
ChunkSegments.push_back(Segment);
}
BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr);
+ OutBlock.ChunkHashes.push_back(It.first);
+ OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize));
}
ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
@@ -1207,7 +1265,8 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
}
CompressedBuffer CompressedBlock =
CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
-
+ OutBlock.BlockHash = CompressedBlock.DecodeRawHash();
+ OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize());
return CompressedBlock;
}
@@ -1221,9 +1280,9 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::vector<remotestore_impl::Block>& KnownBlocks,
+ const std::vector<RemoteProjectStore::Block>& KnownBlocks,
WorkerThreadPool& WorkerPool,
- const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles,
@@ -1245,9 +1304,9 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
- RwLock BlocksLock;
- std::vector<remotestore_impl::Block> Blocks;
- CompressedBuffer OpsBuffer;
+ RwLock BlocksLock;
+ std::vector<RemoteProjectStore::Block> Blocks;
+ CompressedBuffer OpsBuffer;
std::filesystem::path AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
@@ -1463,7 +1522,7 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
- auto FindReuseBlocks = [](const std::vector<remotestore_impl::Block>& KnownBlocks,
+ auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks,
const std::unordered_set<IoHash, IoHash::Hasher>& Attachments,
JobContext* OptionalContext) -> std::vector<size_t> {
std::vector<size_t> ReuseBlockIndexes;
@@ -1476,14 +1535,14 @@ BuildContainer(CidStore& ChunkStore,
for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
{
- const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
+ const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size();
if (BlockAttachmentCount == 0)
{
continue;
}
size_t FoundAttachmentCount = 0;
- for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
+ for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (Attachments.contains(KnownHash))
{
@@ -1524,8 +1583,8 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
+ const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (UploadAttachments.erase(KnownHash) == 1)
{
@@ -1865,8 +1924,8 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
+ const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (ChunkedHashes.erase(KnownHash) == 1)
{
@@ -1978,7 +2037,7 @@ BuildContainer(CidStore& ChunkStore,
uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
auto NewBlock = [&]() {
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
size_t ChunkCount = ChunksInBlock.size();
if (BuildBlocks)
{
@@ -2000,9 +2059,9 @@ BuildContainer(CidStore& ChunkStore,
{
// We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
+ Blocks[BlockIndex].ChunkHashes.insert(Blocks[BlockIndex].ChunkHashes.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
}
uint64_t NowMS = Timer.GetElapsedTimeMs();
ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
@@ -2234,12 +2293,11 @@ BuildContainer(CidStore& ChunkStore,
CbObjectWriter OplogContinerWriter;
RwLock::SharedLockScope _(BlocksLock);
OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
-
OplogContinerWriter.BeginArray("blocks"sv);
{
- for (const remotestore_impl::Block& B : Blocks)
+ for (const RemoteProjectStore::Block& B : Blocks)
{
- ZEN_ASSERT(!B.ChunksInBlock.empty());
+ ZEN_ASSERT(!B.ChunkHashes.empty());
if (BuildBlocks)
{
ZEN_ASSERT(B.BlockHash != IoHash::Zero);
@@ -2249,7 +2307,7 @@ BuildContainer(CidStore& ChunkStore,
OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash);
OplogContinerWriter.BeginArray("chunks"sv);
{
- for (const IoHash& RawHash : B.ChunksInBlock)
+ for (const IoHash& RawHash : B.ChunkHashes)
{
OplogContinerWriter.AddHash(RawHash);
}
@@ -2265,7 +2323,7 @@ BuildContainer(CidStore& ChunkStore,
{
OplogContinerWriter.BeginArray("chunks"sv);
{
- for (const IoHash& RawHash : B.ChunksInBlock)
+ for (const IoHash& RawHash : B.ChunkHashes)
{
OplogContinerWriter.AddBinaryAttachment(RawHash);
}
@@ -2331,7 +2389,7 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
@@ -2391,21 +2449,21 @@ SaveOplog(CidStore& ChunkStore,
CreateDirectories(AttachmentTempPath);
}
- remotestore_impl::AsyncRemoteResult RemoteResult;
- RwLock AttachmentsLock;
- std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
- tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
+ remotestore_impl::AsyncRemoteResult RemoteResult;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
+ std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks;
+ tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
- auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
- const IoHash& BlockHash) {
+ auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
+ RemoteProjectStore::Block&& Block) {
std::filesystem::path BlockPath = AttachmentTempPath;
- BlockPath.append(BlockHash.ToHexString());
+ BlockPath.append(Block.BlockHash.ToHexString());
try
{
IoBuffer BlockBuffer = remotestore_impl::WriteToTempFile(std::move(CompressedBlock), BlockPath);
RwLock::ExclusiveLockScope __(AttachmentsLock);
- CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
+ CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}});
ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize()));
}
catch (const std::exception& Ex)
@@ -2417,8 +2475,11 @@ SaveOplog(CidStore& ChunkStore,
}
};
- auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
- RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash);
+ auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
+ RemoteProjectStore::Block&& Block) {
+ IoHash BlockHash = Block.BlockHash;
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block));
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
@@ -2448,7 +2509,7 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Found attachment {}", AttachmentHash);
};
- std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock;
+ std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock;
if (RemoteStoreInfo.UseTempBlockFiles)
{
OnBlock = MakeTempBlock;
@@ -2458,95 +2519,52 @@ SaveOplog(CidStore& ChunkStore,
OnBlock = UploadBlock;
}
- std::vector<remotestore_impl::Block> KnownBlocks;
+ std::vector<RemoteProjectStore::Block> KnownBlocks;
uint64_t TransferWallTimeMS = 0;
- if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty())
+ RemoteProjectStore::CreateContainerResult ContainerResult = RemoteStore.CreateContainer();
+ if (ContainerResult.ErrorCode)
{
+ RemoteProjectStore::Result Result = {.ErrorCode = ContainerResult.ErrorCode,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Text = fmt::format("Failed to create container for oplog '{}' ({}): {}",
+ RemoteStoreInfo.ContainerName,
+ ContainerResult.ErrorCode,
+ ContainerResult.Reason)};
remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName));
- Stopwatch LoadBaseContainerTimer;
- RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer();
- TransferWallTimeMS += LoadBaseContainerTimer.GetElapsedTimeMs();
-
- if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent))
- {
- if (BaseContainerResult.ErrorCode)
- {
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments",
- RemoteStoreInfo.BaseContainerName,
- BaseContainerResult.ErrorCode,
- BaseContainerResult.Reason));
- }
- else
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Loaded oplog base container in {}",
- NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0))));
-
- CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView();
-
- std::vector<IoHash> BlockHashes;
- BlockHashes.reserve(BlocksArray.Num());
- for (CbFieldView BlockField : BlocksArray)
- {
- CbObjectView BlockView = BlockField.AsObjectView();
- IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
- BlockHashes.push_back(BlockHash);
- }
-
- RemoteProjectStore::HasAttachmentsResult HasResult = RemoteStore.HasAttachments(BlockHashes);
- if (HasResult.ErrorCode == 0)
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}",
- BlockHashes.size(),
- BlockHashes.size() > 1 ? "s"sv : ""sv,
- BlockHashes.size() - HasResult.Needs.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0))));
- if (HasResult.Needs.size() < BlocksArray.Num())
- {
- KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size());
-
- const std::unordered_set<IoHash, IoHash::Hasher> MissingBlocks(HasResult.Needs);
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return Result;
+ }
- for (CbFieldView BlockField : BlocksArray)
- {
- CbObjectView BlockView = BlockField.AsObjectView();
- IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
- if (!MissingBlocks.contains(BlockHash))
- {
- std::vector<IoHash> ChunksInBlock;
- CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
- if (BlockHash == IoHash::Zero)
- {
- continue;
- }
+ if (RemoteStoreInfo.CreateBlocks)
+ {
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching known blocks from '{}'", RemoteStoreInfo.Description));
+ Stopwatch GetKnownBlocksTimer;
+ RemoteProjectStore::GetKnownBlocksResult KnownBlocksResult = RemoteStore.GetKnownBlocks();
+ TransferWallTimeMS += GetKnownBlocksTimer.GetElapsedTimeMs();
- ChunksInBlock.reserve(ChunksArray.Num());
- for (CbFieldView ChunkField : ChunksArray)
- {
- ChunksInBlock.push_back(ChunkField.AsHash());
- }
- KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)});
- }
- }
- }
- }
- else
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none "
- "does: '{}', error code : {}",
- HasResult.Reason,
- HasResult.ErrorCode));
- }
- }
+ if (KnownBlocksResult.ErrorCode == static_cast<int>(HttpResponseCode::NoContent))
+ {
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("No known blocks in '{}', uploading all attachments", RemoteStoreInfo.Description));
+ }
+ else if (KnownBlocksResult.ErrorCode)
+ {
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Failed to get known blocks from '{}' ({}): {}, uploading all attachments",
+ RemoteStoreInfo.Description,
+ KnownBlocksResult.ErrorCode,
+ KnownBlocksResult.Reason));
+ }
+ else
+ {
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Fetched {} known blocks from '{}' in {}",
+ KnownBlocksResult.Blocks.size(),
+ RemoteStoreInfo.Description,
+ NiceTimeSpanMs(static_cast<uint64_t>(KnownBlocksResult.ElapsedSeconds * 1000.0))));
+ KnownBlocks = std::move(KnownBlocksResult.Blocks);
}
}
@@ -2588,15 +2606,22 @@ SaveOplog(CidStore& ChunkStore,
RemoteStoreInfo.ContainerName,
ChunkCount,
BlockCount));
- Stopwatch SaveContainerTimer;
- RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
+ Stopwatch SaveContainerTimer;
+ IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer();
+ ContainerPayload.SetContentType(ZenContentType::kCbObject);
+ RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload));
TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs();
if (ContainerSaveResult.ErrorCode)
{
RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
+ RemoteProjectStore::Result Result = {
+ .ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Text = fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())};
remotestore_impl::ReportMessage(
OptionalContext,
fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return Result;
}
else
{