aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp946
1 files changed, 727 insertions, 219 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 8be8eb0df..2a9da6f58 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
+#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenremotestore/chunking/chunkedfile.h>
#include <zenremotestore/operationlogoutput.h>
#include <zenstore/cidstore.h>
@@ -229,29 +230,60 @@ namespace remotestore_impl {
struct DownloadInfo
{
- uint64_t OplogSizeBytes = 0;
- std::atomic<uint64_t> AttachmentsDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
- std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentsStored = 0;
- std::atomic<uint64_t> AttachmentBytesStored = 0;
- std::atomic_size_t MissingAttachmentCount = 0;
+ uint64_t OplogSizeBytes = 0;
+ std::atomic<uint64_t> AttachmentsDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksRangesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlockRangeBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentsStored = 0;
+ std::atomic<uint64_t> AttachmentBytesStored = 0;
+ std::atomic_size_t MissingAttachmentCount = 0;
};
- void DownloadAndSaveBlockChunks(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const std::vector<IoHash>& Chunks)
+ class JobContextLogOutput : public OperationLogOutput
+ {
+ public:
+ JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {}
+ virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override
+ {
+ ZEN_UNUSED(LogLevel);
+ if (m_OptionalContext)
+ {
+ fmt::basic_memory_buffer<char, 250> MessageBuffer;
+ fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args);
+ remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size()));
+ }
+ }
+
+ virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); }
+ virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); }
+ virtual uint32_t GetProgressUpdateDelayMS() override { return 0; }
+ virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override
+ {
+ ZEN_UNUSED(InSubTask);
+ return nullptr;
+ }
+
+ private:
+ JobContext* m_OptionalContext;
+ };
+
+ void DownloadAndSaveBlockChunks(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ ThinChunkBlockDescription&& ThinBlockDescription,
+ std::vector<uint32_t>&& NeededChunkIndexes)
{
AttachmentsDownloadLatch.AddCount(1);
NetworkWorkerPool.ScheduleWork(
@@ -261,7 +293,8 @@ namespace remotestore_impl {
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
&RemoteResult,
- Chunks = Chunks,
+ ThinBlockDescription = std::move(ThinBlockDescription),
+ NeededChunkIndexes = std::move(NeededChunkIndexes),
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
@@ -276,6 +309,13 @@ namespace remotestore_impl {
}
try
{
+ std::vector<IoHash> Chunks;
+ Chunks.reserve(NeededChunkIndexes.size());
+ for (uint32_t ChunkIndex : NeededChunkIndexes)
+ {
+ Chunks.push_back(ThinBlockDescription.ChunkRawHashes[ChunkIndex]);
+ }
+
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
@@ -293,7 +333,12 @@ namespace remotestore_impl {
}
return;
}
- Info.AttachmentsDownloaded.fetch_add(Chunks.size());
+ Info.AttachmentsDownloaded.fetch_add(Result.Chunks.size());
+ for (const auto& It : Result.Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ }
ZEN_INFO("Loaded {} bulk attachments in {}",
Chunks.size(),
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
@@ -320,8 +365,6 @@ namespace remotestore_impl {
for (const auto& It : Chunks)
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
WriteRawHashes.push_back(It.first);
}
@@ -350,28 +393,29 @@ namespace remotestore_impl {
catch (const std::exception& Ex)
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk load {} attachments", Chunks.size()),
+ fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()),
Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
};
- void DownloadAndSaveBlock(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const IoHash& BlockHash,
- const std::vector<IoHash>& Chunks,
- uint32_t RetriesLeft)
+ void DownloadAndSaveBlock(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const IoHash& BlockHash,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup,
+ std::span<std::atomic<bool>> ChunkDownloadedFlags,
+ uint32_t RetriesLeft)
{
AttachmentsDownloadLatch.AddCount(1);
NetworkWorkerPool.ScheduleWork(
@@ -381,7 +425,6 @@ namespace remotestore_impl {
&RemoteStore,
&NetworkWorkerPool,
&WorkerPool,
- BlockHash,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
@@ -389,7 +432,9 @@ namespace remotestore_impl {
IgnoreMissingAttachments,
OptionalContext,
RetriesLeft,
- Chunks = std::vector<IoHash>(Chunks)]() {
+ BlockHash = IoHash(BlockHash),
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags]() {
ZEN_TRACE_CPU("DownloadBlock");
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
@@ -401,7 +446,7 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash, {});
if (BlockResult.ErrorCode)
{
ReportMessage(OptionalContext,
@@ -422,10 +467,10 @@ namespace remotestore_impl {
}
uint64_t BlockSize = BlockResult.Bytes.GetSize();
Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
+ ZEN_DEBUG("Loaded block attachment '{}' in {} ({})",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlockSize));
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
AttachmentsWriteLatch.AddCount(1);
@@ -436,7 +481,6 @@ namespace remotestore_impl {
&RemoteStore,
&NetworkWorkerPool,
&WorkerPool,
- BlockHash,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
@@ -444,8 +488,10 @@ namespace remotestore_impl {
IgnoreMissingAttachments,
OptionalContext,
RetriesLeft,
- Chunks = std::move(Chunks),
- Bytes = std::move(BlockResult.Bytes)]() {
+ BlockHash = IoHash(BlockHash),
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ Bytes = std::move(BlockResult.Bytes)]() {
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -454,9 +500,6 @@ namespace remotestore_impl {
try
{
ZEN_ASSERT(Bytes.Size() > 0);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
std::vector<IoBuffer> WriteAttachmentBuffers;
std::vector<IoHash> WriteRawHashes;
@@ -485,7 +528,8 @@ namespace remotestore_impl {
LoadAttachmentsTimer,
DownloadStartMS,
BlockHash,
- std::move(Chunks),
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
RetriesLeft - 1);
}
ReportMessage(
@@ -519,7 +563,8 @@ namespace remotestore_impl {
LoadAttachmentsTimer,
DownloadStartMS,
BlockHash,
- std::move(Chunks),
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
RetriesLeft - 1);
}
ReportMessage(OptionalContext,
@@ -546,28 +591,36 @@ namespace remotestore_impl {
uint64_t BlockSize = BlockPayload.GetSize();
uint64_t BlockHeaderSize = 0;
- bool StoreChunksOK = IterateChunkBlock(
- BlockPayload.Flatten(),
- [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize](
- CompressedBuffer&& Chunk,
- const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(
- CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
- RawHash,
- RawSize,
- /*OutOptionalTotalCompressedSize*/ nullptr));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- WantedChunks.erase(AttachmentRawHash);
- PotentialSize += WriteAttachmentBuffers.back().GetSize();
- }
- },
- BlockHeaderSize);
+
+ bool StoreChunksOK = IterateChunkBlock(
+ BlockPayload.Flatten(),
+ [&AllNeededPartialChunkHashesLookup,
+ &ChunkDownloadedFlags,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &Info,
+ &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash);
+ if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
+ {
+ bool Expected = false;
+ if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(
+ CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ PotentialSize += WriteAttachmentBuffers.back().GetSize();
+ }
+ }
+ },
+ BlockHeaderSize);
if (!StoreChunksOK)
{
@@ -582,8 +635,6 @@ namespace remotestore_impl {
return;
}
- ZEN_ASSERT(WantedChunks.empty());
-
if (!WriteAttachmentBuffers.empty())
{
auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
@@ -625,6 +676,293 @@ namespace remotestore_impl {
WorkerThreadPool::EMode::EnableBacklog);
};
+ void DownloadAndSavePartialBlock(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors,
+ size_t BlockRangeIndexStart,
+ size_t BlockRangeCount,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup,
+ std::span<std::atomic<bool>> ChunkDownloadedFlags,
+ uint32_t RetriesLeft)
+ {
+ AttachmentsDownloadLatch.AddCount(1);
+ NetworkWorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ BlockDescription,
+ BlockRangeDescriptors,
+ BlockRangeIndexStart,
+ BlockRangeCount,
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags]() {
+ ZEN_TRACE_CPU("DownloadBlockRanges");
+
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+
+ double DownloadElapsedSeconds = 0;
+ uint64_t DownloadedBytes = 0;
+
+ for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount;
+ BlockRangeIndex++)
+ {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex];
+
+ RemoteProjectStore::LoadAttachmentResult BlockResult =
+ RemoteStore.LoadAttachment(BlockDescription.BlockHash,
+ {.Offset = BlockRange.RangeStart, .Bytes = BlockRange.RangeLength});
+ if (BlockResult.ErrorCode)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download block attachment '{}' range {},{} ({}): {}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength,
+ BlockResult.ErrorCode,
+ BlockResult.Reason));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
+ return;
+ }
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ uint64_t BlockPartSize = BlockResult.Bytes.GetSize();
+ if (BlockPartSize != BlockRange.RangeLength)
+ {
+ std::string ErrorString =
+ fmt::format("Failed to download block attachment '{}' range {},{}, got {} bytes ({}): {}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength,
+ BlockPartSize,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Mismatching block part range received",
+ ErrorString);
+ }
+ return;
+ }
+ Info.AttachmentBlocksRangesDownloaded.fetch_add(1);
+
+ DownloadElapsedSeconds += BlockResult.ElapsedSeconds;
+ DownloadedBytes += BlockPartSize;
+
+ Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ BlockDescription,
+ BlockRange,
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ BlockPayload = std::move(BlockResult.Bytes)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ ZEN_ASSERT(BlockPayload.Size() > 0);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ uint64_t PotentialSize = 0;
+ uint64_t UsedSize = 0;
+ uint64_t BlockPartSize = BlockPayload.GetSize();
+
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart;
+ ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount;
+ ChunkBlockIndex++)
+ {
+ const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+
+ if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash);
+ ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
+ {
+ bool Expected = false;
+ if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true))
+ {
+ IoHash VerifyChunkHash;
+ uint64_t VerifyChunkSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(
+ SharedBuffer(IoBuffer(BlockPayload, OffsetInBlock, ChunkCompressedSize)),
+ VerifyChunkHash,
+ VerifyChunkSize);
+ if (!CompressedChunk)
+ {
+ std::string ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash);
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Malformed chunk block",
+ ErrorString);
+ }
+ continue;
+ }
+ if (VerifyChunkHash != ChunkHash)
+ {
+ std::string ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' has mismatching hash, expected {}, got {}",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash,
+ ChunkHash,
+ VerifyChunkHash);
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Malformed chunk block",
+ ErrorString);
+ }
+ continue;
+ }
+ if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
+ {
+ std::string ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' has mismatching raw size, expected {}, "
+ "got {}",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash,
+ BlockDescription.ChunkRawLengths[ChunkBlockIndex],
+ VerifyChunkSize);
+ ReportMessage(OptionalContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Malformed chunk block",
+ ErrorString);
+ }
+ continue;
+ }
+
+ WriteAttachmentBuffers.emplace_back(CompressedChunk.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.emplace_back(ChunkHash);
+ PotentialSize += WriteAttachmentBuffers.back().GetSize();
+ }
+ }
+ OffsetInBlock += ChunkCompressedSize;
+ }
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ UsedSize += WriteAttachmentBuffers[Index].GetSize();
+ }
+ }
+ ZEN_DEBUG("Used {} (matching {}) out of {} for block {} range {}, {} ({} %) (use of matching {}%)",
+ NiceBytes(UsedSize),
+ NiceBytes(PotentialSize),
+ NiceBytes(BlockPartSize),
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength,
+ (100 * UsedSize) / BlockPartSize,
+ PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed save block attachment {} range {}, {}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+
+ ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})",
+ BlockRangeCount,
+ BlockDescription.BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)),
+ NiceBytes(DownloadedBytes));
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ };
+
void DownloadAndSaveAttachment(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
bool IgnoreMissingAttachments,
@@ -664,7 +1002,7 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash, {});
if (AttachmentResult.ErrorCode)
{
ReportMessage(OptionalContext,
@@ -680,10 +1018,10 @@ namespace remotestore_impl {
return;
}
uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
+ ZEN_DEBUG("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
Info.AttachmentsDownloaded.fetch_add(1);
if (RemoteResult.IsError())
{
@@ -1224,35 +1562,7 @@ BuildContainer(CidStore& ChunkStore,
{
using namespace std::literals;
- class JobContextLogOutput : public OperationLogOutput
- {
- public:
- JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {}
- virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override
- {
- ZEN_UNUSED(LogLevel);
- if (m_OptionalContext)
- {
- fmt::basic_memory_buffer<char, 250> MessageBuffer;
- fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args);
- remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size()));
- }
- }
-
- virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); }
- virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); }
- virtual uint32_t GetProgressUpdateDelayMS() override { return 0; }
- virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override
- {
- ZEN_UNUSED(InSubTask);
- return nullptr;
- }
-
- private:
- JobContext* m_OptionalContext;
- };
-
- std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<JobContextLogOutput>(OptionalContext));
+ std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext));
size_t OpCount = 0;
@@ -2768,14 +3078,15 @@ SaveOplog(CidStore& ChunkStore,
};
RemoteProjectStore::Result
-ParseOplogContainer(const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- CbObject& OutOplogSection,
- JobContext* OptionalContext)
+ParseOplogContainer(
+ const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ CbObject& OutOplogSection,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2801,12 +3112,12 @@ ParseOplogContainer(const CbObject& ContainerObject,
"Section has unexpected data type",
"Failed to save oplog container"};
}
- std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> NeededAttachments;
{
CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView();
for (CbFieldView OpEntry : OpsArray)
{
- OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); });
+ OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); });
if (remotestore_impl::IsCancelled(OptionalContext))
{
return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
@@ -2816,7 +3127,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
{
- std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
+ std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end());
OnReferencedAttachments(ReferencedAttachments);
}
@@ -2827,24 +3138,27 @@ ParseOplogContainer(const CbObject& ContainerObject,
.Reason = "Operation cancelled"};
}
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", NeededAttachments.size()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
for (CbFieldView ChunkedFileField : ChunkedFilesArray)
{
CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView();
IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash();
- if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash)))
+ if (NeededAttachments.erase(RawHash) == 1)
{
- ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView);
-
- OnReferencedAttachments(Chunked.ChunkHashes);
- OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
- OnChunkedAttachment(Chunked);
- ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- Chunked.ChunkHashes.size());
+ if (!HasAttachment(RawHash))
+ {
+ ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView);
+
+ OnReferencedAttachments(Chunked.ChunkHashes);
+ NeededAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
+ OnChunkedAttachment(Chunked);
+ ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ Chunked.ChunkHashes.size());
+ }
}
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -2854,6 +3168,8 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
+ std::vector<ThinChunkBlockDescription> ThinBlocksDescriptions;
+
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
@@ -2863,45 +3179,38 @@ ParseOplogContainer(const CbObject& ContainerObject,
CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(ChunksArray.Num());
- if (BlockHash == IoHash::Zero)
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
{
- for (CbFieldView ChunkField : ChunksArray)
- {
- IoHash ChunkHash = ChunkField.AsBinaryAttachment();
- if (OpsAttachments.erase(ChunkHash) == 1)
- {
- if (!HasAttachment(ChunkHash))
- {
- NeededChunks.emplace_back(ChunkHash);
- }
- }
- }
+ ChunkHashes.push_back(ChunkField.AsHash());
}
- else
+ ThinBlocksDescriptions.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)});
+ }
+
+ for (ThinChunkBlockDescription& ThinBlockDescription : ThinBlocksDescriptions)
+ {
+ std::vector<uint32_t> NeededBlockChunkIndexes;
+ for (uint32_t ChunkIndex = 0; ChunkIndex < ThinBlockDescription.ChunkRawHashes.size(); ChunkIndex++)
{
- for (CbFieldView ChunkField : ChunksArray)
+ const IoHash& ChunkHash = ThinBlockDescription.ChunkRawHashes[ChunkIndex];
+ if (NeededAttachments.erase(ChunkHash) == 1)
{
- const IoHash ChunkHash = ChunkField.AsHash();
- if (OpsAttachments.erase(ChunkHash) == 1)
+ if (!HasAttachment(ChunkHash))
{
- if (!HasAttachment(ChunkHash))
- {
- NeededChunks.emplace_back(ChunkHash);
- }
+ NeededBlockChunkIndexes.push_back(ChunkIndex);
}
}
}
-
- if (!NeededChunks.empty())
+ if (!NeededBlockChunkIndexes.empty())
{
- OnNeedBlock(BlockHash, std::move(NeededChunks));
- if (BlockHash != IoHash::Zero)
+ if (ThinBlockDescription.BlockHash != IoHash::Zero)
{
NeedBlockCount++;
}
+ OnNeedBlock(std::move(ThinBlockDescription), std::move(NeededBlockChunkIndexes));
}
+
if (remotestore_impl::IsCancelled(OptionalContext))
{
return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
@@ -2909,6 +3218,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
.Reason = "Operation cancelled"};
}
}
+
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
@@ -2918,7 +3228,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
{
IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (OpsAttachments.erase(AttachmentHash) == 1)
+ if (NeededAttachments.erase(AttachmentHash) == 1)
{
if (!HasAttachment(AttachmentHash))
{
@@ -2941,14 +3251,15 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
RemoteProjectStore::Result
-SaveOplogContainer(ProjectStore::Oplog& Oplog,
- const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- JobContext* OptionalContext)
+SaveOplogContainer(
+ ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2972,18 +3283,23 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
RemoteProjectStore::Result
-LoadOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- bool ForceDownload,
- bool IgnoreMissingAttachments,
- bool CleanOplog,
- JobContext* OptionalContext)
+LoadOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ bool ForceDownload,
+ bool IgnoreMissingAttachments,
+ bool CleanOplog,
+ EPartialBlockRequestMode PartialBlockRequestMode,
+ double HostLatencySec,
+ double CacheLatencySec,
+ JobContext* OptionalContext)
{
using namespace std::literals;
+ std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext));
+
remotestore_impl::DownloadInfo Info;
Stopwatch Timer;
@@ -3035,6 +3351,14 @@ LoadOplog(CidStore& ChunkStore,
return false;
};
+ struct NeededBlockDownload
+ {
+ ThinChunkBlockDescription ThinBlockDescription;
+ std::vector<uint32_t> NeededChunkIndexes;
+ };
+
+ std::vector<NeededBlockDownload> NeededBlockDownloads;
+
auto OnNeedBlock = [&RemoteStore,
&ChunkStore,
&NetworkWorkerPool,
@@ -3047,8 +3371,9 @@ LoadOplog(CidStore& ChunkStore,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
+ &NeededBlockDownloads,
IgnoreMissingAttachments,
- OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) {
+ OptionalContext](ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes) {
if (RemoteResult.IsError())
{
return;
@@ -3056,7 +3381,7 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload++;
AttachmentCount.fetch_add(1);
- if (BlockHash == IoHash::Zero)
+ if (ThinBlockDescription.BlockHash == IoHash::Zero)
{
DownloadAndSaveBlockChunks(ChunkStore,
RemoteStore,
@@ -3070,25 +3395,13 @@ LoadOplog(CidStore& ChunkStore,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
- Chunks);
+ std::move(ThinBlockDescription),
+ std::move(NeededChunkIndexes));
}
else
{
- DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- Chunks,
- 3);
+ NeededBlockDownloads.push_back(NeededBlockDownload{.ThinBlockDescription = std::move(ThinBlockDescription),
+ .NeededChunkIndexes = std::move(NeededChunkIndexes)});
}
};
@@ -3132,12 +3445,7 @@ LoadOplog(CidStore& ChunkStore,
};
std::vector<ChunkedInfo> FilesToDechunk;
- auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
- if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
- {
- FilesToDechunk.push_back(Chunked);
- }
- };
+ auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); };
auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); };
@@ -3165,6 +3473,185 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload,
FilesToDechunk.size()));
+ std::vector<IoHash> BlockHashes;
+ std::vector<IoHash> AllNeededChunkHashes;
+ BlockHashes.reserve(NeededBlockDownloads.size());
+ for (const NeededBlockDownload& BlockDownload : NeededBlockDownloads)
+ {
+ BlockHashes.push_back(BlockDownload.ThinBlockDescription.BlockHash);
+ for (uint32_t ChunkIndex : BlockDownload.NeededChunkIndexes)
+ {
+ AllNeededChunkHashes.push_back(BlockDownload.ThinBlockDescription.ChunkRawHashes[ChunkIndex]);
+ }
+ }
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllNeededPartialChunkHashesLookup = BuildHashLookup(AllNeededChunkHashes);
+ std::vector<std::atomic<bool>> ChunkDownloadedFlags(AllNeededChunkHashes.size());
+ std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false);
+ ChunkBlockAnalyser::BlockResult PartialBlocksResult;
+
+ RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = RemoteStore.GetBlockDescriptions(BlockHashes);
+ std::vector<IoHash> BlocksWithDescription;
+ BlocksWithDescription.reserve(BlockDescriptions.Blocks.size());
+ for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks)
+ {
+ BlocksWithDescription.push_back(BlockDescription.BlockHash);
+ }
+ {
+ auto WantIt = NeededBlockDownloads.begin();
+ auto FindIt = BlockDescriptions.Blocks.begin();
+ while (WantIt != NeededBlockDownloads.end())
+ {
+ if (FindIt == BlockDescriptions.Blocks.end())
+ {
+ // Fall back to full download as we can't get enough information about the block
+ DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ WantIt->ThinBlockDescription.BlockHash,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+ for (uint32_t BlockChunkIndex : WantIt->NeededChunkIndexes)
+ {
+ const IoHash& ChunkHash = WantIt->ThinBlockDescription.ChunkRawHashes[BlockChunkIndex];
+ auto It = AllNeededPartialChunkHashesLookup.find(ChunkHash);
+ ZEN_ASSERT(It != AllNeededPartialChunkHashesLookup.end());
+ uint32_t ChunkIndex = It->second;
+ DownloadedViaLegacyChunkFlag[ChunkIndex] = true;
+ }
+ WantIt++;
+ }
+ else if (WantIt->ThinBlockDescription.BlockHash == FindIt->BlockHash)
+ {
+ // Found
+ FindIt++;
+ WantIt++;
+ }
+ else
+ {
+ // Not a requested block?
+ ZEN_ASSERT(false);
+ }
+ }
+ }
+ if (!AllNeededChunkHashes.empty())
+ {
+ std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes;
+
+ if (PartialBlockRequestMode == EPartialBlockRequestMode::Off)
+ {
+ PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ }
+ else
+ {
+ RemoteProjectStore::AttachmentExistsInCacheResult CacheExistsResult =
+ RemoteStore.AttachmentExistsInCache(BlocksWithDescription);
+ if (CacheExistsResult.ErrorCode != 0 || CacheExistsResult.HasBody.size() != BlocksWithDescription.size())
+ {
+ CacheExistsResult.HasBody.resize(BlocksWithDescription.size(), false);
+ }
+
+ PartialBlockDownloadModes.reserve(BlocksWithDescription.size());
+
+ for (bool ExistsInCache : CacheExistsResult.HasBody)
+ {
+ if (PartialBlockRequestMode == EPartialBlockRequestMode::All)
+ {
+ PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
+ }
+ else if (PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
+ {
+ PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ }
+ else if (PartialBlockRequestMode == EPartialBlockRequestMode::Mixed)
+ {
+ PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange);
+ }
+ }
+ }
+
+ ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size());
+
+ ChunkBlockAnalyser PartialAnalyser(*LogOutput,
+ BlockDescriptions.Blocks,
+ ChunkBlockAnalyser::Options{.IsQuiet = false,
+ .IsVerbose = false,
+ .HostLatencySec = HostLatencySec,
+ .HostHighSpeedLatencySec = CacheLatencySec});
+
+ std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks =
+ PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup,
+ [&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; });
+
+ PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes);
+ for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes)
+ {
+ DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescriptions.Blocks[FullBlockIndex].BlockHash,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+ }
+
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();)
+ {
+ size_t RangeCount = 1;
+ size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex];
+ while (RangeCount < RangesLeft &&
+ CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex)
+ {
+ RangeCount++;
+ }
+
+ DownloadAndSavePartialBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex],
+ PartialBlocksResult.BlockRanges,
+ BlockRangeIndex,
+ RangeCount,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+
+ BlockRangeIndex += RangeCount;
+ }
+ }
+
AttachmentsDownloadLatch.CountDown();
while (!AttachmentsDownloadLatch.Wait(1000))
{
@@ -3478,21 +3965,30 @@ LoadOplog(CidStore& ChunkStore,
}
}
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}",
- RemoteStoreInfo.ContainerName,
- Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksDownloaded.load(),
- NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
- Info.AttachmentsDownloaded.load(),
- NiceBytes(Info.AttachmentBytesDownloaded.load()),
- Info.AttachmentsStored.load(),
- NiceBytes(Info.AttachmentBytesStored.load()),
- Info.MissingAttachmentCount.load(),
- remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
+ uint64_t TotalDownloads =
+ 1 + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
+ uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() +
+ Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
+
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} "
+ "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}",
+ RemoteStoreInfo.ContainerName,
+ Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
+ Info.AttachmentBlocksRangesDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()),
+ Info.AttachmentsDownloaded.load(),
+ NiceBytes(Info.AttachmentBytesDownloaded.load()),
+ TotalDownloads,
+ NiceBytes(TotalBytesDownloaded),
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Info.MissingAttachmentCount.load(),
+ remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
return Result;
}
@@ -3697,6 +4193,9 @@ TEST_CASE_TEMPLATE("project.store.export",
/*Force*/ false,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ false,
+ EPartialBlockRequestMode::Mixed,
+ /*HostLatencySec*/ -1.0,
+ /*CacheLatencySec*/ -1.0,
nullptr);
CHECK(ImportResult.ErrorCode == 0);
@@ -3708,6 +4207,9 @@ TEST_CASE_TEMPLATE("project.store.export",
/*Force*/ true,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ false,
+ EPartialBlockRequestMode::Mixed,
+ /*HostLatencySec*/ -1.0,
+ /*CacheLatencySec*/ -1.0,
nullptr);
CHECK(ImportForceResult.ErrorCode == 0);
@@ -3719,6 +4221,9 @@ TEST_CASE_TEMPLATE("project.store.export",
/*Force*/ false,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ true,
+ EPartialBlockRequestMode::Mixed,
+ /*HostLatencySec*/ -1.0,
+ /*CacheLatencySec*/ -1.0,
nullptr);
CHECK(ImportCleanResult.ErrorCode == 0);
@@ -3730,6 +4235,9 @@ TEST_CASE_TEMPLATE("project.store.export",
/*Force*/ true,
/*IgnoreMissingAttachments*/ false,
/*CleanOplog*/ true,
+ EPartialBlockRequestMode::Mixed,
+ /*HostLatencySec*/ -1.0,
+ /*CacheLatencySec*/ -1.0,
nullptr);
CHECK(ImportForceCleanResult.ErrorCode == 0);
}