aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp2522
1 files changed, 1943 insertions, 579 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 8be8eb0df..247bd6cb9 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -14,6 +14,8 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
+#include <zenremotestore/builds/buildstoragecache.h>
+#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenremotestore/chunking/chunkedfile.h>
#include <zenremotestore/operationlogoutput.h>
#include <zenstore/cidstore.h>
@@ -123,14 +125,17 @@ namespace remotestore_impl {
return OptionalContext->IsCancelled();
}
- std::string GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS)
+ std::string GetStats(const RemoteProjectStore::Stats& Stats,
+ const BuildStorageCache::Statistics* OptionalCacheStats,
+ uint64_t ElapsedWallTimeMS)
{
- return fmt::format(
- "Sent: {} ({}bits/s) Recv: {} ({}bits/s)",
- NiceBytes(Stats.m_SentBytes),
- NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u),
- NiceBytes(Stats.m_ReceivedBytes),
- NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u));
+ uint64_t SentBytes = Stats.m_SentBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesWritten.load() : 0);
+ uint64_t ReceivedBytes = Stats.m_ReceivedBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesRead.load() : 0);
+ return fmt::format("Sent: {} ({}bits/s) Recv: {} ({}bits/s)",
+ NiceBytes(SentBytes),
+ NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u),
+ NiceBytes(ReceivedBytes),
+ NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u));
}
void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
@@ -229,44 +234,66 @@ namespace remotestore_impl {
struct DownloadInfo
{
- uint64_t OplogSizeBytes = 0;
- std::atomic<uint64_t> AttachmentsDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
- std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentsStored = 0;
- std::atomic<uint64_t> AttachmentBytesStored = 0;
- std::atomic_size_t MissingAttachmentCount = 0;
+ uint64_t OplogSizeBytes = 0;
+ std::atomic<uint64_t> AttachmentsDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksRangesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlockRangeBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentsStored = 0;
+ std::atomic<uint64_t> AttachmentBytesStored = 0;
+ std::atomic_size_t MissingAttachmentCount = 0;
};
- void DownloadAndSaveBlockChunks(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const std::vector<IoHash>& Chunks)
+ class JobContextLogOutput : public OperationLogOutput
+ {
+ public:
+ JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {}
+ virtual void EmitLogMessage(const logging::LogPoint& Point, fmt::format_args Args) override
+ {
+ if (m_OptionalContext)
+ {
+ fmt::basic_memory_buffer<char, 250> MessageBuffer;
+ fmt::vformat_to(fmt::appender(MessageBuffer), Point.FormatString, Args);
+ remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size()));
+ }
+ }
+
+ virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); }
+ virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); }
+ virtual uint32_t GetProgressUpdateDelayMS() override { return 0; }
+ virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override
+ {
+ ZEN_UNUSED(InSubTask);
+ return nullptr;
+ }
+
+ private:
+ JobContext* m_OptionalContext;
+ };
+
+ void DownloadAndSaveBlockChunks(LoadOplogContext& Context,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ ThinChunkBlockDescription&& ThinBlockDescription,
+ std::vector<uint32_t>&& NeededChunkIndexes)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
- [&RemoteStore,
- &ChunkStore,
- &WorkerPool,
+ Context.NetworkWorkerPool.ScheduleWork(
+ [&Context,
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
&RemoteResult,
- Chunks = Chunks,
+ ThinBlockDescription = std::move(ThinBlockDescription),
+ NeededChunkIndexes = std::move(NeededChunkIndexes),
&Info,
&LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext]() {
+ &DownloadStartMS]() {
ZEN_TRACE_CPU("DownloadBlockChunks");
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
@@ -276,34 +303,47 @@ namespace remotestore_impl {
}
try
{
+ std::vector<IoHash> Chunks;
+ Chunks.reserve(NeededChunkIndexes.size());
+ for (uint32_t ChunkIndex : NeededChunkIndexes)
+ {
+ Chunks.push_back(ThinBlockDescription.ChunkRawHashes[ChunkIndex]);
+ }
+
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ RemoteProjectStore::LoadAttachmentsResult Result = Context.RemoteStore.LoadAttachments(Chunks);
if (Result.ErrorCode)
{
- ReportMessage(OptionalContext,
+ ReportMessage(Context.OptionalJobContext,
fmt::format("Failed to load attachments with {} chunks ({}): {}",
Chunks.size(),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
Info.MissingAttachmentCount.fetch_add(1);
- if (IgnoreMissingAttachments)
+ if (Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
return;
}
- Info.AttachmentsDownloaded.fetch_add(Chunks.size());
- ZEN_INFO("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ Info.AttachmentsDownloaded.fetch_add(Result.Chunks.size());
+ for (const auto& It : Result.Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ }
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
+ fmt::format("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))));
if (RemoteResult.IsError())
{
return;
}
AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
+ Context.WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch, &RemoteResult, &Info, &Context, Chunks = std::move(Result.Chunks)]() {
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -320,13 +360,13 @@ namespace remotestore_impl {
for (const auto& It : Chunks)
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
WriteRawHashes.push_back(It.first);
}
std::vector<CidStore::InsertResult> InsertResults =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+ Context.ChunkStore.AddChunks(WriteAttachmentBuffers,
+ WriteRawHashes,
+ CidStore::InsertMode::kCopyOnly);
for (size_t Index = 0; Index < InsertResults.size(); Index++)
{
@@ -350,46 +390,38 @@ namespace remotestore_impl {
catch (const std::exception& Ex)
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk load {} attachments", Chunks.size()),
+ fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()),
Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
};
- void DownloadAndSaveBlock(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const IoHash& BlockHash,
- const std::vector<IoHash>& Chunks,
- uint32_t RetriesLeft)
+ void DownloadAndSaveBlock(LoadOplogContext& Context,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const IoHash& BlockHash,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup,
+ std::span<std::atomic<bool>> ChunkDownloadedFlags,
+ uint32_t RetriesLeft)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
+ Context.NetworkWorkerPool.ScheduleWork(
[&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- BlockHash,
+ &Context,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
RetriesLeft,
- Chunks = std::vector<IoHash>(Chunks)]() {
+ BlockHash = IoHash(BlockHash),
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags]() {
ZEN_TRACE_CPU("DownloadBlock");
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
@@ -401,51 +433,65 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
- if (BlockResult.ErrorCode)
+
+ IoBuffer BlobBuffer;
+ if (Context.OptionalCache)
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- }
- return;
+ BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, BlockHash);
}
- if (RemoteResult.IsError())
+
+ if (!BlobBuffer)
{
- return;
+ RemoteProjectStore::LoadAttachmentResult BlockResult = Context.RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
+ {
+ ReportMessage(Context.OptionalJobContext,
+ fmt::format("Failed to download block attachment {} ({}): {}",
+ BlockHash,
+ BlockResult.Reason,
+ BlockResult.Text));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!Context.IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
+ return;
+ }
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ BlobBuffer = std::move(BlockResult.Bytes);
+ ZEN_DEBUG("Loaded block attachment '{}' in {} ({})",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlobBuffer.Size()));
+ if (Context.OptionalCache && Context.PopulateCache)
+ {
+ Context.OptionalCache->PutBuildBlob(Context.CacheBuildId,
+ BlockHash,
+ BlobBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlobBuffer)));
+ }
}
- uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ uint64_t BlockSize = BlobBuffer.GetSize();
Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
+ Context.WorkerPool.ScheduleWork(
[&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- BlockHash,
+ &Context,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
RetriesLeft,
- Chunks = std::move(Chunks),
- Bytes = std::move(BlockResult.Bytes)]() {
+ BlockHash = IoHash(BlockHash),
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ Bytes = std::move(BlobBuffer)]() {
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -454,64 +500,107 @@ namespace remotestore_impl {
try
{
ZEN_ASSERT(Bytes.Size() > 0);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
std::vector<IoBuffer> WriteAttachmentBuffers;
std::vector<IoHash> WriteRawHashes;
IoHash RawHash;
uint64_t RawSize;
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize);
+
+ std::string ErrorString;
+
if (!Compressed)
{
- if (RetriesLeft > 0)
+ ErrorString =
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash);
+ }
+ else if (RawHash != BlockHash)
+ {
+ ErrorString = fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash);
+ }
+ else if (CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); !BlockPayload)
+ {
+ ErrorString = fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash);
+ }
+ else
+ {
+ uint64_t PotentialSize = 0;
+ uint64_t UsedSize = 0;
+ uint64_t BlockSize = BlockPayload.GetSize();
+
+ uint64_t BlockHeaderSize = 0;
+
+ bool StoreChunksOK = IterateChunkBlock(
+ BlockPayload.Flatten(),
+ [&AllNeededPartialChunkHashesLookup,
+ &ChunkDownloadedFlags,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &Info,
+ &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash);
+ if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
+ {
+ bool Expected = false;
+ if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(
+ WriteAttachmentBuffers.back(),
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ PotentialSize += WriteAttachmentBuffers.back().GetSize();
+ }
+ }
+ },
+ BlockHeaderSize);
+
+ if (!StoreChunksOK)
{
- ReportMessage(
- OptionalContext,
- fmt::format(
- "Block attachment {} is malformed, can't parse as compressed binary, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- std::move(Chunks),
- RetriesLeft - 1);
+ ErrorString = fmt::format("Invalid format for block {}", BlockHash);
+ }
+ else
+ {
+ if (!WriteAttachmentBuffers.empty())
+ {
+ std::vector<CidStore::InsertResult> Results =
+ Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const CidStore::InsertResult& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ UsedSize += WriteAttachmentBuffers[Index].GetSize();
+ }
+ }
+ if (UsedSize < BlockSize)
+ {
+ ZEN_DEBUG("Used {} (skipping {}) out of {} for block {} ({} %) (use of matching {}%)",
+ NiceBytes(UsedSize),
+ NiceBytes(BlockSize - UsedSize),
+ NiceBytes(BlockSize),
+ BlockHash,
+ (100 * UsedSize) / BlockSize,
+ PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
+ }
+ }
}
- ReportMessage(
- OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
- {});
- return;
}
- CompositeBuffer BlockPayload = Compressed.DecompressToComposite();
- if (!BlockPayload)
+
+ if (!ErrorString.empty())
{
if (RetriesLeft > 0)
{
- ReportMessage(
- OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString));
+
+ return DownloadAndSaveBlock(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -519,91 +608,16 @@ namespace remotestore_impl {
LoadAttachmentsTimer,
DownloadStartMS,
BlockHash,
- std::move(Chunks),
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
RetriesLeft - 1);
}
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash),
- {});
- return;
- }
- if (RawHash != BlockHash)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
- {});
- return;
- }
-
- uint64_t PotentialSize = 0;
- uint64_t UsedSize = 0;
- uint64_t BlockSize = BlockPayload.GetSize();
-
- uint64_t BlockHeaderSize = 0;
- bool StoreChunksOK = IterateChunkBlock(
- BlockPayload.Flatten(),
- [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize](
- CompressedBuffer&& Chunk,
- const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(
- CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
- RawHash,
- RawSize,
- /*OutOptionalTotalCompressedSize*/ nullptr));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- WantedChunks.erase(AttachmentRawHash);
- PotentialSize += WriteAttachmentBuffers.back().GetSize();
- }
- },
- BlockHeaderSize);
-
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
-
- ZEN_ASSERT(WantedChunks.empty());
-
- if (!WriteAttachmentBuffers.empty())
- {
- auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (size_t Index = 0; Index < Results.size(); Index++)
+ else
{
- const auto& Result = Results[Index];
- if (Result.New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
- UsedSize += WriteAttachmentBuffers[Index].GetSize();
- }
+ ReportMessage(Context.OptionalJobContext, ErrorString);
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), ErrorString, {});
+ return;
}
- ZEN_DEBUG("Used {} (matching {}) out of {} for block {} ({} %) (use of matching {}%)",
- NiceBytes(UsedSize),
- NiceBytes(PotentialSize),
- NiceBytes(BlockSize),
- BlockHash,
- (100 * UsedSize) / BlockSize,
- PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
}
}
catch (const std::exception& Ex)
@@ -618,19 +632,458 @@ namespace remotestore_impl {
catch (const std::exception& Ex)
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to block attachment {}", BlockHash),
+ fmt::format("Failed to download block attachment {}", BlockHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ };
+
+ void DownloadPartialBlock(LoadOplogContext& Context,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ double& DownloadTimeSeconds,
+ const ChunkBlockDescription& BlockDescription,
+ bool BlockExistsInCache,
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors,
+ size_t BlockRangeIndexStart,
+ size_t BlockRangeCount,
+ std::function<void(IoBuffer&& Buffer,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded)
+ {
+ ZEN_ASSERT(Context.StoreMaxRangeCountPerRequest != 0);
+ ZEN_ASSERT(BlockExistsInCache == false || Context.CacheMaxRangeCountPerRequest != 0);
+
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ Ranges.reserve(BlockRangeDescriptors.size());
+ for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; BlockRangeIndex++)
+ {
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex];
+ Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength));
+ }
+
+ size_t SubBlockRangeCount = BlockRangeCount;
+ size_t SubRangeCountComplete = 0;
+ std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges);
+
+ while (SubRangeCountComplete < SubBlockRangeCount)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+
+ size_t SubRangeStartIndex = BlockRangeIndexStart + SubRangeCountComplete;
+ if (BlockExistsInCache)
+ {
+ ZEN_ASSERT(Context.OptionalCache);
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.CacheMaxRangeCountPerRequest);
+
+ if (SubRangeCount == 1)
+ {
+ // Legacy single-range path, prefer that for max compatibility
+
+ const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete];
+ Stopwatch CacheTimer;
+ IoBuffer PayloadBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId,
+ BlockDescription.BlockHash,
+ SubRange.first,
+ SubRange.second);
+ DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0;
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ if (PayloadBuffer)
+ {
+ OnDownloaded(std::move(PayloadBuffer),
+ SubRangeStartIndex,
+ std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)});
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ else
+ {
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
+
+ Stopwatch CacheTimer;
+ BuildStorageCache::BuildBlobRanges RangeBuffers =
+ Context.OptionalCache->GetBuildBlobRanges(Context.CacheBuildId, BlockDescription.BlockHash, SubRanges);
+ DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0;
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ if (RangeBuffers.PayloadBuffer)
+ {
+ if (RangeBuffers.Ranges.empty())
+ {
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ OnDownloaded(std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount));
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ else if (RangeBuffers.Ranges.size() == SubRangeCount)
+ {
+ OnDownloaded(std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ }
+ }
+
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.StoreMaxRangeCountPerRequest);
+
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
+
+ RemoteProjectStore::LoadAttachmentRangesResult BlockResult =
+ Context.RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges);
+ DownloadTimeSeconds += BlockResult.ElapsedSeconds;
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ if (BlockResult.ErrorCode || !BlockResult.Bytes)
+ {
+ ReportMessage(Context.OptionalJobContext,
+ fmt::format("Failed to download {} ranges from block attachment '{}' ({}): {}",
+ SubRanges.size(),
+ BlockDescription.BlockHash,
+ BlockResult.ErrorCode,
+ BlockResult.Reason));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!Context.IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ break;
+ }
+ }
+ else
+ {
+ if (BlockResult.Ranges.empty())
+ {
+ // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
+ // Use the whole payload for the remaining ranges
+
+ if (Context.OptionalCache && Context.PopulateCache)
+ {
+ Context.OptionalCache->PutBuildBlob(Context.CacheBuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(std::vector<IoBuffer>{BlockResult.Bytes}));
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ }
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ OnDownloaded(std::move(BlockResult.Bytes),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount));
+ }
+ else
+ {
+ if (BlockResult.Ranges.size() != SubRanges.size())
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Range response for block {} contains {} ranges, expected {} ranges",
+ BlockDescription.BlockHash,
+ BlockResult.Ranges.size(),
+ SubRanges.size()),
+ "");
+ break;
+ }
+ OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges);
+ }
+ }
+
+ SubRangeCountComplete += SubRangeCount;
+ }
+ }
+
+ void DownloadAndSavePartialBlock(LoadOplogContext& Context,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const ChunkBlockDescription& BlockDescription,
+ bool BlockExistsInCache,
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors,
+ size_t BlockRangeIndexStart,
+ size_t BlockRangeCount,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& AllNeededPartialChunkHashesLookup,
+ std::span<std::atomic<bool>> ChunkDownloadedFlags,
+ uint32_t RetriesLeft)
+ {
+ AttachmentsDownloadLatch.AddCount(1);
+ Context.NetworkWorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &Context,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ BlockDescription,
+ BlockExistsInCache,
+ BlockRangeDescriptors,
+ BlockRangeIndexStart,
+ BlockRangeCount,
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ RetriesLeft]() {
+ ZEN_TRACE_CPU("DownloadBlockRanges");
+
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+
+ double DownloadElapsedSeconds = 0;
+ uint64_t DownloadedBytes = 0;
+
+ DownloadPartialBlock(
+ Context,
+ RemoteResult,
+ Info,
+ DownloadElapsedSeconds,
+ BlockDescription,
+ BlockExistsInCache,
+ BlockRangeDescriptors,
+ BlockRangeIndexStart,
+ BlockRangeCount,
+ [&](IoBuffer&& Buffer,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) {
+ uint64_t BlockPartSize = Buffer.GetSize();
+ DownloadedBytes += BlockPartSize;
+
+ Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize);
+ Info.AttachmentBlocksRangesDownloaded++;
+
+ AttachmentsWriteLatch.AddCount(1);
+ Context.WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch,
+ &Context,
+ &AttachmentsDownloadLatch,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ BlockDescription,
+ BlockExistsInCache,
+ BlockRangeDescriptors,
+ BlockRangeStartIndex,
+ &AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ RetriesLeft,
+ BlockPayload = std::move(Buffer),
+ OffsetAndLengths =
+ std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ try
+ {
+ ZEN_ASSERT(BlockPayload.Size() > 0);
+
+ size_t RangeCount = OffsetAndLengths.size();
+ for (size_t RangeOffset = 0; RangeOffset < RangeCount; RangeOffset++)
+ {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange =
+ BlockRangeDescriptors[BlockRangeStartIndex + RangeOffset];
+ const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengths[RangeOffset];
+ IoBuffer BlockRangeBuffer(BlockPayload, OffsetAndLength.first, OffsetAndLength.second);
+
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ uint64_t PotentialSize = 0;
+ uint64_t UsedSize = 0;
+ uint64_t BlockPartSize = BlockRangeBuffer.GetSize();
+
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart;
+ ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount;
+ ChunkBlockIndex++)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+
+ const uint32_t ChunkCompressedSize =
+ BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+
+ if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash);
+ ChunkIndexIt != AllNeededPartialChunkHashesLookup.end())
+ {
+ if (!ChunkDownloadedFlags[ChunkIndexIt->second])
+ {
+ IoHash VerifyChunkHash;
+ uint64_t VerifyChunkSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(
+ SharedBuffer(IoBuffer(BlockRangeBuffer, OffsetInBlock, ChunkCompressedSize)),
+ VerifyChunkHash,
+ VerifyChunkSize);
+
+ std::string ErrorString;
+
+ if (!CompressedChunk)
+ {
+ ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash);
+ }
+ else if (VerifyChunkHash != ChunkHash)
+ {
+ ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' has mismatching hash, expected "
+ "{}, got {}",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash,
+ ChunkHash,
+ VerifyChunkHash);
+ }
+ else if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
+ {
+ ErrorString = fmt::format(
+ "Chunk at {},{} in block attachment '{}' has mismatching raw size, "
+ "expected {}, "
+ "got {}",
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockDescription.BlockHash,
+ BlockDescription.ChunkRawLengths[ChunkBlockIndex],
+ VerifyChunkSize);
+ }
+
+ if (!ErrorString.empty())
+ {
+ if (RetriesLeft > 0)
+ {
+ ReportMessage(Context.OptionalJobContext,
+ fmt::format("{}, retrying download", ErrorString));
+ return DownloadAndSavePartialBlock(Context,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescription,
+ BlockExistsInCache,
+ BlockRangeDescriptors,
+ BlockRangeStartIndex,
+ RangeCount,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ RetriesLeft - 1);
+ }
+
+ ReportMessage(Context.OptionalJobContext, ErrorString);
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!Context.IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ "Malformed chunk block",
+ ErrorString);
+ }
+ }
+ else
+ {
+ bool Expected = false;
+ if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected,
+ true))
+ {
+ WriteAttachmentBuffers.emplace_back(
+ CompressedChunk.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.emplace_back(ChunkHash);
+ PotentialSize += WriteAttachmentBuffers.back().GetSize();
+ }
+ }
+ }
+ }
+ OffsetInBlock += ChunkCompressedSize;
+ }
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ std::vector<CidStore::InsertResult> Results =
+ Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const CidStore::InsertResult& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ UsedSize += WriteAttachmentBuffers[Index].GetSize();
+ }
+ }
+ if (UsedSize < BlockPartSize)
+ {
+ ZEN_DEBUG(
+ "Used {} (skipping {}) out of {} for block {} range {}, {} ({} %) (use of matching "
+ "{}%)",
+ NiceBytes(UsedSize),
+ NiceBytes(BlockPartSize - UsedSize),
+ NiceBytes(BlockPartSize),
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength,
+ (100 * UsedSize) / BlockPartSize,
+ PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0);
+ }
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed saving {} ranges from block attachment {}",
+ OffsetAndLengths.size(),
+ BlockDescription.BlockHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ });
+ if (!RemoteResult.IsError())
+ {
+ ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})",
+ BlockRangeCount,
+ BlockDescription.BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(DownloadElapsedSeconds * 1000)),
+ NiceBytes(DownloadedBytes));
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash),
Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
};
- void DownloadAndSaveAttachment(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
+ void DownloadAndSaveAttachment(LoadOplogContext& Context,
Latch& AttachmentsDownloadLatch,
Latch& AttachmentsWriteLatch,
AsyncRemoteResult& RemoteResult,
@@ -640,19 +1093,15 @@ namespace remotestore_impl {
const IoHash& RawHash)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
- [&RemoteStore,
- &ChunkStore,
- &WorkerPool,
+ Context.NetworkWorkerPool.ScheduleWork(
+ [&Context,
&RemoteResult,
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
RawHash,
&LoadAttachmentsTimer,
&DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext]() {
+ &Info]() {
ZEN_TRACE_CPU("DownloadAttachment");
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
@@ -664,43 +1113,52 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
+ IoBuffer BlobBuffer;
+ if (Context.OptionalCache)
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to download large attachment {}: '{}', error code : {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, RawHash);
+ }
+ if (!BlobBuffer)
+ {
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = Context.RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
{
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ReportMessage(Context.OptionalJobContext,
+ fmt::format("Failed to download large attachment {}: '{}', error code : {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!Context.IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ }
+ return;
+ }
+ BlobBuffer = std::move(AttachmentResult.Bytes);
+ ZEN_DEBUG("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlobBuffer.GetSize()));
+ if (Context.OptionalCache && Context.PopulateCache)
+ {
+ Context.OptionalCache->PutBuildBlob(Context.CacheBuildId,
+ RawHash,
+ BlobBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlobBuffer)));
}
- return;
}
- uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
- Info.AttachmentsDownloaded.fetch_add(1);
if (RemoteResult.IsError())
{
return;
}
+ uint64_t AttachmentSize = BlobBuffer.GetSize();
+ Info.AttachmentsDownloaded.fetch_add(1);
Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- RawHash,
- AttachmentSize,
- Bytes = std::move(AttachmentResult.Bytes),
- OptionalContext]() {
+ Context.WorkerPool.ScheduleWork(
+ [&Context, &AttachmentsWriteLatch, &RemoteResult, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)]() {
ZEN_TRACE_CPU("WriteAttachment");
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
@@ -710,7 +1168,7 @@ namespace remotestore_impl {
}
try
{
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
+ CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash);
if (InsertResult.New)
{
Info.AttachmentBytesStored.fetch_add(AttachmentSize);
@@ -1126,7 +1584,9 @@ namespace remotestore_impl {
uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs();
ReportProgress(OptionalContext,
"Saving attachments"sv,
- fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ fmt::format("{} remaining... {}",
+ Remaining,
+ GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, PartialTransferWallTimeMS)),
AttachmentsToSave,
Remaining);
}
@@ -1135,7 +1595,7 @@ namespace remotestore_impl {
{
ReportProgress(OptionalContext,
"Saving attachments"sv,
- fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)),
+ fmt::format("{}", GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)),
AttachmentsToSave,
0);
}
@@ -1146,7 +1606,7 @@ namespace remotestore_impl {
LargeAttachmentCountToUpload,
BulkAttachmentCountToUpload,
NiceTimeSpanMs(ElapsedTimeMS),
- GetStats(RemoteStore.GetStats(), ElapsedTimeMS)));
+ GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)));
}
} // namespace remotestore_impl
@@ -1224,35 +1684,7 @@ BuildContainer(CidStore& ChunkStore,
{
using namespace std::literals;
- class JobContextLogOutput : public OperationLogOutput
- {
- public:
- JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {}
- virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) override
- {
- ZEN_UNUSED(LogLevel);
- if (m_OptionalContext)
- {
- fmt::basic_memory_buffer<char, 250> MessageBuffer;
- fmt::vformat_to(fmt::appender(MessageBuffer), Format, Args);
- remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size()));
- }
- }
-
- virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); }
- virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); }
- virtual uint32_t GetProgressUpdateDelayMS() override { return 0; }
- virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override
- {
- ZEN_UNUSED(InSubTask);
- return nullptr;
- }
-
- private:
- JobContext* m_OptionalContext;
- };
-
- std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<JobContextLogOutput>(OptionalContext));
+ std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext));
size_t OpCount = 0;
@@ -1783,31 +2215,36 @@ BuildContainer(CidStore& ChunkStore,
}
ResolveAttachmentsLatch.CountDown();
- while (!ResolveAttachmentsLatch.Wait(1000))
{
- ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ ptrdiff_t AttachmentCountToUseForProgress = ResolveAttachmentsLatch.Remaining();
+ while (!ResolveAttachmentsLatch.Wait(1000))
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- while (!ResolveAttachmentsLatch.Wait(1000))
+ ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
- Remaining = ResolveAttachmentsLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("Aborting, {} attachments remaining...", Remaining),
- UploadAttachments.size(),
- Remaining);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ while (!ResolveAttachmentsLatch.Wait(1000))
+ {
+ Remaining = ResolveAttachmentsLatch.Remaining();
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("Aborting, {} attachments remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining);
+ }
+ remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0);
+ return {};
}
- remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0);
- return {};
+ AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("{} remaining...", Remaining),
+ AttachmentCountToUseForProgress,
+ Remaining);
}
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("{} remaining...", Remaining),
- UploadAttachments.size(),
- Remaining);
}
if (UploadAttachments.size() > 0)
{
@@ -2010,14 +2447,13 @@ BuildContainer(CidStore& ChunkStore,
AsyncOnBlock,
RemoteResult);
ComposedBlocks++;
+ // Worker will set Blocks[BlockIndex] = Block (including ChunkRawHashes) under shared lock
}
else
{
ZEN_INFO("Bulk group {} attachments", ChunkCount);
OnBlockChunks(std::move(ChunksInBlock));
- }
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ // We can share the lock as we are not resizing the vector and only touch our own index
RwLock::SharedLockScope _(BlocksLock);
Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes);
}
@@ -2195,12 +2631,14 @@ BuildContainer(CidStore& ChunkStore,
0);
}
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}",
- ChunkAssembleCount,
- TotalOpCount,
- GeneratedBlockCount,
- NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
+ ChunkAssembleCount,
+ TotalOpCount,
+ GeneratedBlockCount,
+ LargeChunkHashes.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -2752,30 +3190,32 @@ SaveOplog(CidStore& ChunkStore,
remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats());
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
- RemoteStoreInfo.ContainerName,
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksUploaded.load(),
- NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
- Info.AttachmentsUploaded.load(),
- NiceBytes(Info.AttachmentBytesUploaded.load()),
- remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
+ RemoteStoreInfo.ContainerName,
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksUploaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
+ Info.AttachmentsUploaded.load(),
+ NiceBytes(Info.AttachmentBytesUploaded.load()),
+ remotestore_impl::GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, TransferWallTimeMS)));
return Result;
};
RemoteProjectStore::Result
-ParseOplogContainer(const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- CbObject& OutOplogSection,
- JobContext* OptionalContext)
+ParseOplogContainer(
+ const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ CbObject& OutOplogSection,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2801,22 +3241,43 @@ ParseOplogContainer(const CbObject& ContainerObject,
"Section has unexpected data type",
"Failed to save oplog container"};
}
- std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> NeededAttachments;
{
CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView();
+
+ size_t OpCount = OpsArray.Num();
+ size_t OpsCompleteCount = 0;
+
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Scanning {} ops for attachments", OpCount));
+
for (CbFieldView OpEntry : OpsArray)
{
- OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); });
+ OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); });
if (remotestore_impl::IsCancelled(OptionalContext))
{
return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
.Reason = "Operation cancelled"};
}
+ OpsCompleteCount++;
+ if ((OpsCompleteCount & 4095) == 0)
+ {
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Scanning oplog"sv,
+ fmt::format("{} attachments found, {} ops remaining...", NeededAttachments.size(), OpCount - OpsCompleteCount),
+ OpCount,
+ OpCount - OpsCompleteCount);
+ }
}
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Scanning oplog"sv,
+ fmt::format("{} attachments found", NeededAttachments.size()),
+ OpCount,
+ OpCount - OpsCompleteCount);
}
{
- std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
+ std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end());
OnReferencedAttachments(ReferencedAttachments);
}
@@ -2827,24 +3288,41 @@ ParseOplogContainer(const CbObject& ContainerObject,
.Reason = "Operation cancelled"};
}
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", NeededAttachments.size()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
for (CbFieldView ChunkedFileField : ChunkedFilesArray)
{
CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView();
IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash();
- if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash)))
+ if (NeededAttachments.erase(RawHash) == 1)
{
- ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView);
+ if (!HasAttachment(RawHash))
+ {
+ ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView);
+
+ size_t NeededChunkAttachmentCount = 0;
- OnReferencedAttachments(Chunked.ChunkHashes);
- OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
- OnChunkedAttachment(Chunked);
- ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- Chunked.ChunkHashes.size());
+ OnReferencedAttachments(Chunked.ChunkHashes);
+ for (const IoHash& ChunkHash : Chunked.ChunkHashes)
+ {
+ if (!HasAttachment(ChunkHash))
+ {
+ if (NeededAttachments.insert(ChunkHash).second)
+ {
+ NeededChunkAttachmentCount++;
+ }
+ }
+ }
+ OnChunkedAttachment(Chunked);
+
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Requesting chunked attachment '{}' ({}) built from {} chunks, need {} chunks",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ Chunked.ChunkHashes.size(),
+ NeededChunkAttachmentCount));
+ }
}
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -2854,6 +3332,8 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
+ std::vector<ThinChunkBlockDescription> ThinBlocksDescriptions;
+
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
@@ -2863,45 +3343,38 @@ ParseOplogContainer(const CbObject& ContainerObject,
CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(ChunksArray.Num());
- if (BlockHash == IoHash::Zero)
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
{
- for (CbFieldView ChunkField : ChunksArray)
- {
- IoHash ChunkHash = ChunkField.AsBinaryAttachment();
- if (OpsAttachments.erase(ChunkHash) == 1)
- {
- if (!HasAttachment(ChunkHash))
- {
- NeededChunks.emplace_back(ChunkHash);
- }
- }
- }
+ ChunkHashes.push_back(ChunkField.AsHash());
}
- else
+ ThinBlocksDescriptions.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)});
+ }
+
+ for (ThinChunkBlockDescription& ThinBlockDescription : ThinBlocksDescriptions)
+ {
+ std::vector<uint32_t> NeededBlockChunkIndexes;
+ for (uint32_t ChunkIndex = 0; ChunkIndex < ThinBlockDescription.ChunkRawHashes.size(); ChunkIndex++)
{
- for (CbFieldView ChunkField : ChunksArray)
+ const IoHash& ChunkHash = ThinBlockDescription.ChunkRawHashes[ChunkIndex];
+ if (NeededAttachments.erase(ChunkHash) == 1)
{
- const IoHash ChunkHash = ChunkField.AsHash();
- if (OpsAttachments.erase(ChunkHash) == 1)
+ if (!HasAttachment(ChunkHash))
{
- if (!HasAttachment(ChunkHash))
- {
- NeededChunks.emplace_back(ChunkHash);
- }
+ NeededBlockChunkIndexes.push_back(ChunkIndex);
}
}
}
-
- if (!NeededChunks.empty())
+ if (!NeededBlockChunkIndexes.empty())
{
- OnNeedBlock(BlockHash, std::move(NeededChunks));
- if (BlockHash != IoHash::Zero)
+ if (ThinBlockDescription.BlockHash != IoHash::Zero)
{
NeedBlockCount++;
}
+ OnNeedBlock(std::move(ThinBlockDescription), std::move(NeededBlockChunkIndexes));
}
+
if (remotestore_impl::IsCancelled(OptionalContext))
{
return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
@@ -2909,6 +3382,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
.Reason = "Operation cancelled"};
}
}
+
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
@@ -2918,7 +3392,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
{
IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (OpsAttachments.erase(AttachmentHash) == 1)
+ if (NeededAttachments.erase(AttachmentHash) == 1)
{
if (!HasAttachment(AttachmentHash))
{
@@ -2941,14 +3415,15 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
RemoteProjectStore::Result
-SaveOplogContainer(ProjectStore::Oplog& Oplog,
- const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- JobContext* OptionalContext)
+SaveOplogContainer(
+ ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2972,18 +3447,12 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
RemoteProjectStore::Result
-LoadOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- bool ForceDownload,
- bool IgnoreMissingAttachments,
- bool CleanOplog,
- JobContext* OptionalContext)
+LoadOplog(LoadOplogContext&& Context)
{
using namespace std::literals;
+ std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(Context.OptionalJobContext));
+
remotestore_impl::DownloadInfo Info;
Stopwatch Timer;
@@ -2991,25 +3460,25 @@ LoadOplog(CidStore& ChunkStore,
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
uint64_t BlockCountToDownload = 0;
- RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
+ RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = Context.RemoteStore.GetInfo();
+ remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
uint64_t TransferWallTimeMS = 0;
Stopwatch LoadContainerTimer;
- RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
+ RemoteProjectStore::LoadContainerResult LoadContainerResult = Context.RemoteStore.LoadContainer();
TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs();
if (LoadContainerResult.ErrorCode)
{
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode));
return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
.Reason = LoadContainerResult.Reason,
.Text = LoadContainerResult.Text};
}
- remotestore_impl::ReportMessage(OptionalContext,
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("Loaded container in {} ({})",
NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)),
NiceBytes(LoadContainerResult.ContainerObject.GetSize())));
@@ -3023,22 +3492,27 @@ LoadOplog(CidStore& ChunkStore,
Stopwatch LoadAttachmentsTimer;
std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1;
- auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) {
- if (ForceDownload)
+ auto HasAttachment = [&Context](const IoHash& RawHash) {
+ if (Context.ForceDownload)
{
return false;
}
- if (ChunkStore.ContainsChunk(RawHash))
+ if (Context.ChunkStore.ContainsChunk(RawHash))
{
return true;
}
return false;
};
- auto OnNeedBlock = [&RemoteStore,
- &ChunkStore,
- &NetworkWorkerPool,
- &WorkerPool,
+ struct NeededBlockDownload
+ {
+ ThinChunkBlockDescription ThinBlockDescription;
+ std::vector<uint32_t> NeededChunkIndexes;
+ };
+
+ std::vector<NeededBlockDownload> NeededBlockDownloads;
+
+ auto OnNeedBlock = [&Context,
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
&AttachmentCount,
@@ -3047,8 +3521,8 @@ LoadOplog(CidStore& ChunkStore,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) {
+ &NeededBlockDownloads](ThinChunkBlockDescription&& ThinBlockDescription,
+ std::vector<uint32_t>&& NeededChunkIndexes) {
if (RemoteResult.IsError())
{
return;
@@ -3056,47 +3530,26 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload++;
AttachmentCount.fetch_add(1);
- if (BlockHash == IoHash::Zero)
- {
- DownloadAndSaveBlockChunks(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ if (ThinBlockDescription.BlockHash == IoHash::Zero)
+ {
+ DownloadAndSaveBlockChunks(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
- Chunks);
+ std::move(ThinBlockDescription),
+ std::move(NeededChunkIndexes));
}
else
{
- DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- Chunks,
- 3);
+ NeededBlockDownloads.push_back(NeededBlockDownload{.ThinBlockDescription = std::move(ThinBlockDescription),
+ .NeededChunkIndexes = std::move(NeededChunkIndexes)});
}
};
- auto OnNeedAttachment = [&RemoteStore,
- &Oplog,
- &ChunkStore,
- &NetworkWorkerPool,
- &WorkerPool,
+ auto OnNeedAttachment = [&Context,
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
&RemoteResult,
@@ -3104,9 +3557,7 @@ LoadOplog(CidStore& ChunkStore,
&AttachmentCount,
&LoadAttachmentsTimer,
&DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext](const IoHash& RawHash) {
+ &Info](const IoHash& RawHash) {
if (!Attachments.insert(RawHash).second)
{
return;
@@ -3116,12 +3567,7 @@ LoadOplog(CidStore& ChunkStore,
return;
}
AttachmentCount.fetch_add(1);
- DownloadAndSaveAttachment(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ DownloadAndSaveAttachment(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -3132,18 +3578,13 @@ LoadOplog(CidStore& ChunkStore,
};
std::vector<ChunkedInfo> FilesToDechunk;
- auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
- if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
- {
- FilesToDechunk.push_back(Chunked);
- }
- };
+ auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); };
- auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); };
+ auto OnReferencedAttachments = [&Context](std::span<IoHash> RawHashes) { Context.Oplog.CaptureAddedAttachments(RawHashes); };
// Make sure we retain any attachments we download before writing the oplog
- Oplog.EnableUpdateCapture();
- auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); });
+ Context.Oplog.EnableUpdateCapture();
+ auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); });
CbObject OplogSection;
RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject,
@@ -3153,40 +3594,268 @@ LoadOplog(CidStore& ChunkStore,
OnNeedAttachment,
OnChunkedAttachment,
OplogSection,
- OptionalContext);
+ Context.OptionalJobContext);
if (Result.ErrorCode != 0)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
- remotestore_impl::ReportMessage(OptionalContext,
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
Attachments.size(),
BlockCountToDownload,
FilesToDechunk.size()));
- AttachmentsDownloadLatch.CountDown();
- while (!AttachmentsDownloadLatch.Wait(1000))
+ std::vector<IoHash> BlockHashes;
+ std::vector<IoHash> AllNeededChunkHashes;
+ BlockHashes.reserve(NeededBlockDownloads.size());
+ for (const NeededBlockDownload& BlockDownload : NeededBlockDownloads)
{
- ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ BlockHashes.push_back(BlockDownload.ThinBlockDescription.BlockHash);
+ for (uint32_t ChunkIndex : BlockDownload.NeededChunkIndexes)
{
- if (!RemoteResult.IsError())
+ AllNeededChunkHashes.push_back(BlockDownload.ThinBlockDescription.ChunkRawHashes[ChunkIndex]);
+ }
+ }
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllNeededPartialChunkHashesLookup = BuildHashLookup(AllNeededChunkHashes);
+ std::vector<std::atomic<bool>> ChunkDownloadedFlags(AllNeededChunkHashes.size());
+ std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false);
+ ChunkBlockAnalyser::BlockResult PartialBlocksResult;
+
+ remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size()));
+
+ RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions =
+ Context.RemoteStore.GetBlockDescriptions(BlockHashes, Context.OptionalCache, Context.CacheBuildId);
+
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
+ fmt::format("GetBlockDescriptions took {}. Found {} blocks",
+ NiceTimeSpanMs(uint64_t(BlockDescriptions.ElapsedSeconds * 1000)),
+ BlockDescriptions.Blocks.size()));
+
+ std::vector<IoHash> BlocksWithDescription;
+ BlocksWithDescription.reserve(BlockDescriptions.Blocks.size());
+ for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks)
+ {
+ BlocksWithDescription.push_back(BlockDescription.BlockHash);
+ }
+ {
+ auto WantIt = NeededBlockDownloads.begin();
+ auto FindIt = BlockDescriptions.Blocks.begin();
+ while (WantIt != NeededBlockDownloads.end())
+ {
+ if (FindIt == BlockDescriptions.Blocks.end())
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ // Fall back to full download as we can't get enough information about the block
+ DownloadAndSaveBlock(Context,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ WantIt->ThinBlockDescription.BlockHash,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+ for (uint32_t BlockChunkIndex : WantIt->NeededChunkIndexes)
+ {
+ const IoHash& ChunkHash = WantIt->ThinBlockDescription.ChunkRawHashes[BlockChunkIndex];
+ auto It = AllNeededPartialChunkHashesLookup.find(ChunkHash);
+ ZEN_ASSERT(It != AllNeededPartialChunkHashesLookup.end());
+ uint32_t ChunkIndex = It->second;
+ DownloadedViaLegacyChunkFlag[ChunkIndex] = true;
+ }
+ WantIt++;
+ }
+ else if (WantIt->ThinBlockDescription.BlockHash == FindIt->BlockHash)
+ {
+ // Found
+ FindIt++;
+ WantIt++;
+ }
+ else
+ {
+ // Not a requested block?
+ ZEN_ASSERT(false);
}
}
- uint64_t PartialTransferWallTimeMS = TransferWallTimeMS;
- if (DownloadStartMS != (uint64_t)-1)
+ }
+ if (!AllNeededChunkHashes.empty())
+ {
+ std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes;
+ std::vector<bool> BlockExistsInCache(BlocksWithDescription.size(), false);
+
+ if (Context.PartialBlockRequestMode == EPartialBlockRequestMode::Off)
{
- PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ }
+ else
+ {
+ if (Context.OptionalCache)
+ {
+ std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ Context.OptionalCache->BlobsExists(Context.CacheBuildId, BlocksWithDescription);
+ if (CacheExistsResult.size() == BlocksWithDescription.size())
+ {
+ for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++)
+ {
+ BlockExistsInCache[BlobIndex] = CacheExistsResult[BlobIndex].HasBody;
+ }
+ }
+ uint64_t FoundBlocks =
+ std::accumulate(BlockExistsInCache.begin(),
+ BlockExistsInCache.end(),
+ uint64_t(0u),
+ [](uint64_t Current, bool Exists) -> uint64_t { return Current + (Exists ? 1 : 0); });
+ if (FoundBlocks > 0)
+ {
+ remotestore_impl::ReportMessage(
+ Context.OptionalJobContext,
+ fmt::format("Found {} out of {} blocks in cache", FoundBlocks, BlockExistsInCache.size()));
+ }
+ }
+
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+
+ switch (Context.PartialBlockRequestMode)
+ {
+ case EPartialBlockRequestMode::Off:
+ break;
+ case EPartialBlockRequestMode::ZenCacheOnly:
+ CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ break;
+ case EPartialBlockRequestMode::Mixed:
+ CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ case EPartialBlockRequestMode::All:
+ CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = Context.StoreMaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ }
+
+ PartialBlockDownloadModes.reserve(BlocksWithDescription.size());
+ for (uint32_t BlockIndex = 0; BlockIndex < BlocksWithDescription.size(); BlockIndex++)
+ {
+ const bool BlockExistInCache = BlockExistsInCache[BlockIndex];
+ PartialBlockDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode);
+ }
+ }
+
+ ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size());
+
+ ChunkBlockAnalyser PartialAnalyser(
+ *LogOutput,
+ BlockDescriptions.Blocks,
+ ChunkBlockAnalyser::Options{.IsQuiet = false,
+ .IsVerbose = false,
+ .HostLatencySec = Context.StoreLatencySec,
+ .HostHighSpeedLatencySec = Context.CacheLatencySec,
+ .HostMaxRangeCountPerRequest = Context.StoreMaxRangeCountPerRequest,
+ .HostHighSpeedMaxRangeCountPerRequest = Context.CacheMaxRangeCountPerRequest});
+
+ std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks =
+ PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup,
+ [&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; });
+
+ PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes);
+
+ for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes)
+ {
+ DownloadAndSaveBlock(Context,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescriptions.Blocks[FullBlockIndex].BlockHash,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+ }
+
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();)
+ {
+ size_t RangeCount = 1;
+ size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex];
+ while (RangeCount < RangesLeft &&
+ CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex)
+ {
+ RangeCount++;
+ }
+
+ DownloadAndSavePartialBlock(Context,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex],
+ BlockExistsInCache[CurrentBlockRange.BlockIndex],
+ PartialBlocksResult.BlockRanges,
+ BlockRangeIndex,
+ RangeCount,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ /* RetriesLeft*/ 3);
+
+ BlockRangeIndex += RangeCount;
+ }
+ }
+
+ AttachmentsDownloadLatch.CountDown();
+ {
+ ptrdiff_t AttachmentCountToUseForProgress = AttachmentsDownloadLatch.Remaining();
+ while (!AttachmentsDownloadLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
+ if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ uint64_t PartialTransferWallTimeMS = TransferWallTimeMS;
+ if (DownloadStartMS != (uint64_t)-1)
+ {
+ PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ }
+
+ uint64_t AttachmentsDownloaded =
+ Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
+ uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() +
+ Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
+
+ AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
+ remotestore_impl::ReportProgress(
+ Context.OptionalJobContext,
+ "Loading attachments"sv,
+ fmt::format(
+ "{} ({}) downloaded, {} ({}) stored, {} remaining. {}",
+ AttachmentsDownloaded,
+ NiceBytes(AttachmentBytesDownloaded),
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Remaining,
+ remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)),
+ AttachmentCountToUseForProgress,
+ Remaining);
}
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Loading attachments"sv,
- fmt::format("{} remaining. {}", Remaining, remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
- AttachmentCount.load(),
- Remaining);
}
if (DownloadStartMS != (uint64_t)-1)
{
@@ -3195,57 +3864,58 @@ LoadOplog(CidStore& ChunkStore,
if (AttachmentCount.load() > 0)
{
- remotestore_impl::ReportProgress(OptionalContext,
- "Loading attachments"sv,
- fmt::format("{}", remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)),
- AttachmentCount.load(),
- 0);
+ remotestore_impl::ReportProgress(
+ Context.OptionalJobContext,
+ "Loading attachments"sv,
+ fmt::format("{}", remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS)),
+ AttachmentCount.load(),
+ 0);
}
AttachmentsWriteLatch.CountDown();
- while (!AttachmentsWriteLatch.Wait(1000))
{
- ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ ptrdiff_t AttachmentCountToUseForProgress = AttachmentsWriteLatch.Remaining();
+ while (!AttachmentsWriteLatch.Wait(1000))
{
- if (!RemoteResult.IsError())
+ ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
+ if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
}
+ AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
+ remotestore_impl::ReportProgress(Context.OptionalJobContext,
+ "Writing attachments"sv,
+ fmt::format("{} ({}), {} remaining.",
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Remaining),
+ AttachmentCountToUseForProgress,
+ Remaining);
}
- remotestore_impl::ReportProgress(OptionalContext,
- "Writing attachments"sv,
- fmt::format("{} remaining.", Remaining),
- AttachmentCount.load(),
- Remaining);
}
if (AttachmentCount.load() > 0)
{
- remotestore_impl::ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0);
+ remotestore_impl::ReportProgress(Context.OptionalJobContext, "Writing attachments", ""sv, AttachmentCount.load(), 0);
}
if (Result.ErrorCode == 0)
{
if (!FilesToDechunk.empty())
{
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
+ remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
Latch DechunkLatch(1);
- std::filesystem::path TempFilePath = Oplog.TempPath();
+ std::filesystem::path TempFilePath = Context.Oplog.TempPath();
for (const ChunkedInfo& Chunked : FilesToDechunk)
{
std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
DechunkLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- &DechunkLatch,
- TempFileName,
- &Chunked,
- &RemoteResult,
- IgnoreMissingAttachments,
- &Info,
- OptionalContext]() {
+ Context.WorkerPool.ScheduleWork(
+ [&Context, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, &Info]() {
ZEN_TRACE_CPU("DechunkAttachment");
auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
@@ -3279,16 +3949,16 @@ LoadOplog(CidStore& ChunkStore,
for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
{
const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
- IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
+ IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash);
if (!Chunk)
{
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
// We only add 1 as the resulting missing count will be 1 for the dechunked file
Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(
gsl::narrow<int>(HttpResponseCode::NotFound),
@@ -3306,7 +3976,7 @@ LoadOplog(CidStore& ChunkStore,
if (RawHash != ChunkHash)
{
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}",
RawHash,
ChunkHash,
@@ -3314,7 +3984,7 @@ LoadOplog(CidStore& ChunkStore,
// We only add 1 as the resulting missing count will be 1 for the dechunked file
Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(
gsl::narrow<int>(HttpResponseCode::NotFound),
@@ -3351,14 +4021,14 @@ LoadOplog(CidStore& ChunkStore,
}))
{
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Failed to decompress chunk {} for chunked attachment {}",
ChunkHash,
Chunked.RawHash));
// We only add 1 as the resulting missing count will be 1 for the dechunked file
Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(
gsl::narrow<int>(HttpResponseCode::NotFound),
@@ -3380,11 +4050,12 @@ LoadOplog(CidStore& ChunkStore,
TmpFile.Close();
TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
}
+ uint64_t TmpBufferSize = TmpBuffer.GetSize();
CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ Context.ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
if (InsertResult.New)
{
- Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
+ Info.AttachmentBytesStored.fetch_add(TmpBufferSize);
Info.AttachmentsStored.fetch_add(1);
}
@@ -3407,54 +4078,58 @@ LoadOplog(CidStore& ChunkStore,
while (!DechunkLatch.Wait(1000))
{
ptrdiff_t Remaining = DechunkLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
{
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
}
- remotestore_impl::ReportProgress(OptionalContext,
+ remotestore_impl::ReportProgress(Context.OptionalJobContext,
"Dechunking attachments"sv,
fmt::format("{} remaining...", Remaining),
FilesToDechunk.size(),
Remaining);
}
- remotestore_impl::ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0);
+ remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0);
}
Result = RemoteResult.ConvertResult();
}
if (Result.ErrorCode == 0)
{
- if (CleanOplog)
+ if (Context.CleanOplog)
{
- RemoteStore.Flush();
- if (!Oplog.Reset())
+ if (Context.OptionalCache)
+ {
+ Context.OptionalCache->Flush(100, [](intptr_t) { return /*DontWaitForPendingOperation*/ false; });
+ }
+ if (!Context.Oplog.Reset())
{
Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError),
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())};
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
+ .Reason = fmt::format("Failed to clean existing oplog '{}'", Context.Oplog.OplogId())};
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
+ fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
}
}
if (Result.ErrorCode == 0)
{
- remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext);
+ remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext);
}
}
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats());
+ remotestore_impl::LogRemoteStoreStatsDetails(Context.RemoteStore.GetStats());
{
std::string DownloadDetails;
RemoteProjectStore::ExtendedStats ExtendedStats;
- if (RemoteStore.GetExtendedStats(ExtendedStats))
+ if (Context.RemoteStore.GetExtendedStats(ExtendedStats))
{
if (!ExtendedStats.m_ReceivedBytesPerSource.empty())
{
@@ -3473,26 +4148,37 @@ LoadOplog(CidStore& ChunkStore,
Total += It.second;
}
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView()));
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
+ fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView()));
}
}
}
+ uint64_t TotalDownloads =
+ 1 + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
+ uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() +
+ Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
+
remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}",
+ Context.OptionalJobContext,
+ fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} "
+ "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}",
RemoteStoreInfo.ContainerName,
Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
NiceBytes(Info.OplogSizeBytes),
Info.AttachmentBlocksDownloaded.load(),
NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
+ Info.AttachmentBlocksRangesDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()),
Info.AttachmentsDownloaded.load(),
NiceBytes(Info.AttachmentBytesDownloaded.load()),
+ TotalDownloads,
+ NiceBytes(TotalBytesDownloaded),
Info.AttachmentsStored.load(),
NiceBytes(Info.AttachmentBytesStored.load()),
Info.MissingAttachmentCount.load(),
- remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
+ remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS)));
return Result;
}
@@ -3537,7 +4223,7 @@ RemoteProjectStore::~RemoteProjectStore()
#if ZEN_WITH_TESTS
-namespace testutils {
+namespace projectstore_testutils {
using namespace std::literals;
static std::string OidAsString(const Oid& Id)
@@ -3589,7 +4275,29 @@ namespace testutils {
return Result;
}
-} // namespace testutils
+ class TestJobContext : public JobContext
+ {
+ public:
+ explicit TestJobContext(int& OpIndex) : m_OpIndex(OpIndex) {}
+ virtual bool IsCancelled() const { return false; }
+ virtual void ReportMessage(std::string_view Message) { ZEN_INFO("Job {}: {}", m_OpIndex, Message); }
+ virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount)
+ {
+ ZEN_INFO("Job {}: Op '{}'{} {}/{}",
+ m_OpIndex,
+ CurrentOp,
+ Details.empty() ? "" : fmt::format(" {}", Details),
+ TotalCount - RemainingCount,
+ TotalCount);
+ }
+
+ private:
+ int& m_OpIndex;
+ };
+
+} // namespace projectstore_testutils
+
+TEST_SUITE_BEGIN("remotestore.projectstore");
struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse
{
@@ -3616,7 +4324,7 @@ TEST_CASE_TEMPLATE("project.store.export",
ExportForceDisableBlocksFalse_ForceTempBlocksTrue)
{
using namespace std::literals;
- using namespace testutils;
+ using namespace projectstore_testutils;
ScopedTemporaryDirectory TempDir;
ScopedTemporaryDirectory ExportDir;
@@ -3684,56 +4392,712 @@ TEST_CASE_TEMPLATE("project.store.export",
false,
nullptr);
- CHECK(ExportResult.ErrorCode == 0);
+ REQUIRE(ExportResult.ErrorCode == 0);
Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {});
CHECK(OplogImport);
- RemoteProjectStore::Result ImportResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- NetworkPool,
- WorkerPool,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- nullptr);
+ int OpJobIndex = 0;
+ TestJobContext OpJobContext(OpJobIndex);
+
+ RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = Oid::Zero,
+ .Oplog = *OplogImport,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &OpJobContext});
CHECK(ImportResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- NetworkPool,
- WorkerPool,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- nullptr);
+ OpJobIndex++;
+
+ RemoteProjectStore::Result ImportForceResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = Oid::Zero,
+ .Oplog = *OplogImport,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = true,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &OpJobContext});
CHECK(ImportForceResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- NetworkPool,
- WorkerPool,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- nullptr);
+ OpJobIndex++;
+
+ RemoteProjectStore::Result ImportCleanResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = Oid::Zero,
+ .Oplog = *OplogImport,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = true,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &OpJobContext});
CHECK(ImportCleanResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- NetworkPool,
- WorkerPool,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- nullptr);
+ OpJobIndex++;
+
+ RemoteProjectStore::Result ImportForceCleanResult =
+ LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = Oid::Zero,
+ .Oplog = *OplogImport,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = true,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = true,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &OpJobContext});
CHECK(ImportForceCleanResult.ErrorCode == 0);
+ OpJobIndex++;
}
+// Common oplog setup used by the two tests below.
+// Returns a FileRemoteStore backed by ExportDir that has been populated with a SaveOplog call.
+// Keeps the test data identical to project.store.export so the two test suites exercise the same blocks/attachments.
+static RemoteProjectStore::Result
+SetupExportStore(CidStore& CidStore,
+ ProjectStore::Project& Project,
+ WorkerThreadPool& NetworkPool,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& ExportDir,
+ std::shared_ptr<RemoteProjectStore>& OutRemoteStore)
+{
+ using namespace projectstore_testutils;
+ using namespace std::literals;
+
+ Ref<ProjectStore::Oplog> Oplog = Project.NewOplog("oplog_export", {});
+ if (!Oplog)
+ {
+ return RemoteProjectStore::Result{.ErrorCode = -1};
+ }
+
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
+ Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
+
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024,
+ .MaxChunksPerBlock = 1000,
+ .MaxChunkEmbedSize = 32 * 1024u,
+ .ChunkFileSizeLimit = 64u * 1024u},
+ /*.FolderPath =*/ExportDir,
+ /*.Name =*/std::string("oplog_export"),
+ /*.OptionalBaseName =*/std::string(),
+ /*.ForceDisableBlocks =*/false,
+ /*.ForceEnableTempBlocks =*/false};
+
+ OutRemoteStore = CreateFileRemoteStore(Log(), Options);
+ return SaveOplog(CidStore,
+ *OutRemoteStore,
+ Project,
+ *Oplog,
+ NetworkPool,
+ WorkerPool,
+ Options.MaxBlockSize,
+ Options.MaxChunksPerBlock,
+ Options.MaxChunkEmbedSize,
+ Options.ChunkFileSizeLimit,
+ /*EmbedLooseFiles*/ true,
+ /*ForceUpload*/ false,
+ /*IgnoreMissingAttachments*/ false,
+ /*OptionalContext*/ nullptr);
+}
+
+// Creates an export store with a single oplog entry that packs six 512 KB chunks into one
+// ~3 MB block (MaxBlockSize = 8 MB). The resulting block slack (~1.5 MB) far exceeds the
+// 512 KB threshold that ChunkBlockAnalyser requires before it will consider partial-block
+// downloads instead of full-block downloads.
+//
+// This function is self-contained: it creates its own GcManager, CidStore, ProjectStore and
+// Project internally so that each call is independent of any outer test context. After
+// SaveOplog returns, all persistent data lives on disk inside ExportDir and the caller can
+// freely query OutRemoteStore without holding any references to the internal context.
+static RemoteProjectStore::Result
+SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& ExportDir,
+ std::shared_ptr<RemoteProjectStore>& OutRemoteStore)
+{
+ using namespace projectstore_testutils;
+ using namespace std::literals;
+
+ // Self-contained CAS and project store. Subdirectories of ExportDir keep everything
+ // together without relying on the outer TEST_CASE's ExportCidStore / ExportProject.
+ GcManager LocalGc;
+ CidStore LocalCidStore(LocalGc);
+ CidStoreConfiguration LocalCidConfig = {.RootDirectory = ExportDir / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ LocalCidStore.Initialize(LocalCidConfig);
+
+ std::filesystem::path LocalProjectBasePath = ExportDir / "proj";
+ ProjectStore LocalProjectStore(LocalCidStore, LocalProjectBasePath, LocalGc, ProjectStore::Configuration{});
+ Ref<ProjectStore::Project> LocalProject(LocalProjectStore.NewProject(LocalProjectBasePath / "p"sv,
+ "p"sv,
+ (ExportDir / "root").string(),
+ (ExportDir / "engine").string(),
+ (ExportDir / "game").string(),
+ (ExportDir / "game" / "game.uproject").string()));
+
+ Ref<ProjectStore::Oplog> Oplog = LocalProject->NewOplog("oplog_partial_block", {});
+ if (!Oplog)
+ {
+ return RemoteProjectStore::Result{.ErrorCode = -1};
+ }
+
+ // Six 512 KB chunks with OodleCompressionLevel::None so the compressed size stays large
+ // and the block genuinely exceeds the 512 KB slack threshold.
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
+ Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u},
+ OodleCompressionLevel::None)));
+
+ // MaxChunkEmbedSize must be larger than the compressed size of each 512 KB chunk
+ // (OodleCompressionLevel::None → compressed ≈ raw ≈ 512 KB). With the legacy
+ // 32 KB limit all six chunks would become loose large attachments and no block would
+ // be created, so we use the production default of 1.5 MB instead.
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 8u * 1024u * 1024u,
+ .MaxChunksPerBlock = 1000,
+ .MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize,
+ .ChunkFileSizeLimit = 64u * 1024u * 1024u},
+ /*.FolderPath =*/ExportDir,
+ /*.Name =*/std::string("oplog_partial_block"),
+ /*.OptionalBaseName =*/std::string(),
+ /*.ForceDisableBlocks =*/false,
+ /*.ForceEnableTempBlocks =*/false};
+ OutRemoteStore = CreateFileRemoteStore(Log(), Options);
+ return SaveOplog(LocalCidStore,
+ *OutRemoteStore,
+ *LocalProject,
+ *Oplog,
+ NetworkPool,
+ WorkerPool,
+ Options.MaxBlockSize,
+ Options.MaxChunksPerBlock,
+ Options.MaxChunkEmbedSize,
+ Options.ChunkFileSizeLimit,
+ /*EmbedLooseFiles*/ true,
+ /*ForceUpload*/ false,
+ /*IgnoreMissingAttachments*/ false,
+ /*OptionalContext*/ nullptr);
+}
+
+// Returns the first block hash that has at least MinChunkCount chunks, or a zero IoHash
+// if no qualifying block exists in Store.
+static IoHash
+FindBlockWithMultipleChunks(RemoteProjectStore& Store, size_t MinChunkCount)
+{
+ RemoteProjectStore::LoadContainerResult ContainerResult = Store.LoadContainer();
+ if (ContainerResult.ErrorCode != 0)
+ {
+ return {};
+ }
+ std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject);
+ if (BlockHashes.empty())
+ {
+ return {};
+ }
+ RemoteProjectStore::GetBlockDescriptionsResult Descriptions = Store.GetBlockDescriptions(BlockHashes, nullptr, Oid{});
+ if (Descriptions.ErrorCode != 0)
+ {
+ return {};
+ }
+ for (const ChunkBlockDescription& Desc : Descriptions.Blocks)
+ {
+ if (Desc.ChunkRawHashes.size() >= MinChunkCount)
+ {
+ return Desc.BlockHash;
+ }
+ }
+ return {};
+}
+
+// Loads BlockHash from Source and inserts every even-indexed chunk (0, 2, 4, …) into
+// TargetCidStore. Odd-indexed chunks are left absent so that when an import is run
+// against the same block, HasAttachment returns false for three non-adjacent positions
+// — the minimum needed to exercise the multi-range partial-block download paths.
+static void
+SeedCidStoreWithAlternateChunks(CidStore& TargetCidStore, RemoteProjectStore& Source, const IoHash& BlockHash)
+{
+ RemoteProjectStore::LoadAttachmentResult BlockResult = Source.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode != 0 || !BlockResult.Bytes)
+ {
+ return;
+ }
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(BlockResult.Bytes), RawHash, RawSize);
+ if (!Compressed)
+ {
+ return;
+ }
+ CompositeBuffer BlockPayload = Compressed.DecompressToComposite();
+ if (!BlockPayload)
+ {
+ return;
+ }
+
+ uint32_t ChunkIndex = 0;
+ uint64_t HeaderSize = 0;
+ IterateChunkBlock(
+ BlockPayload.Flatten(),
+ [&TargetCidStore, &ChunkIndex](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) {
+ if (ChunkIndex % 2 == 0)
+ {
+ IoBuffer ChunkData = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ TargetCidStore.AddChunk(ChunkData, AttachmentHash);
+ }
+ ++ChunkIndex;
+ },
+ HeaderSize);
+}
+
+TEST_CASE("project.store.import.context_settings")
+{
+ using namespace std::literals;
+ using namespace projectstore_testutils;
+
+ ScopedTemporaryDirectory TempDir;
+ ScopedTemporaryDirectory ExportDir;
+
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ // Export-side CAS and project store: used only by SetupExportStore to build the remote store
+ // payload. Kept separate from the import side so the two CAS instances are disjoint.
+ GcManager ExportGc;
+ CidStore ExportCidStore(ExportGc);
+ CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ ExportCidStore.Initialize(ExportCidConfig);
+
+ std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore";
+ ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{});
+ Ref<ProjectStore::Project> ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+
+ uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u);
+ uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u;
+ WorkerThreadPool WorkerPool(WorkerCount);
+ WorkerThreadPool NetworkPool(NetworkWorkerCount);
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
+ RemoteProjectStore::Result ExportResult =
+ SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path(), RemoteStore);
+ REQUIRE(ExportResult.ErrorCode == 0);
+
+ // Import-side CAS and project store: starts empty, mirroring a fresh machine that has never
+ // downloaded the data. HasAttachment() therefore returns false for every chunk, so the import
+ // genuinely contacts the remote store without needing ForceDownload on the populate pass.
+ GcManager ImportGc;
+ CidStore ImportCidStore(ImportGc);
+ CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ ImportCidStore.Initialize(ImportCidConfig);
+
+ std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore";
+ ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{});
+ Ref<ProjectStore::Project> ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+
+ const Oid CacheBuildId = Oid::NewOid();
+ BuildStorageCache::Statistics CacheStats;
+ std::unique_ptr<BuildStorageCache> Cache = CreateInMemoryBuildStorageCache(256u, CacheStats);
+ auto ResetCacheStats = [&]() {
+ CacheStats.TotalBytesRead = 0;
+ CacheStats.TotalBytesWritten = 0;
+ CacheStats.TotalRequestCount = 0;
+ CacheStats.TotalRequestTimeUs = 0;
+ CacheStats.TotalExecutionTimeUs = 0;
+ CacheStats.PeakSentBytes = 0;
+ CacheStats.PeakReceivedBytes = 0;
+ CacheStats.PeakBytesPerSec = 0;
+ CacheStats.PutBlobCount = 0;
+ CacheStats.PutBlobByteCount = 0;
+ };
+
+ int OpJobIndex = 0;
+
+ TestJobContext OpJobContext(OpJobIndex);
+
+ // Helper: run a LoadOplog against the import-side CAS/project with the given context knobs.
+ // Each call creates a fresh oplog so repeated calls within one SUBCASE don't short-circuit on
+ // already-present data.
+ auto DoImport = [&](BuildStorageCache* OptCache,
+ EPartialBlockRequestMode Mode,
+ double StoreLatency,
+ uint64_t StoreRanges,
+ double CacheLatency,
+ uint64_t CacheRanges,
+ bool PopulateCache,
+ bool ForceDownload) -> RemoteProjectStore::Result {
+ Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog(fmt::format("import_{}", OpJobIndex++), {});
+ return LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = OptCache,
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *ImportOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = ForceDownload,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = Mode,
+ .PopulateCache = PopulateCache,
+ .StoreLatencySec = StoreLatency,
+ .StoreMaxRangeCountPerRequest = StoreRanges,
+ .CacheLatencySec = CacheLatency,
+ .CacheMaxRangeCountPerRequest = CacheRanges,
+ .OptionalJobContext = &OpJobContext});
+ };
+
+ // Shorthand: Mode=All, low latency, 128 ranges for both store and cache.
+ auto ImportAll = [&](BuildStorageCache* OptCache, bool Populate, bool Force) {
+ return DoImport(OptCache, EPartialBlockRequestMode::All, 0.001, 128u, 0.001, 128u, Populate, Force);
+ };
+
+ SUBCASE("mode_off_no_cache")
+ {
+ // Baseline: no partial block requests, no cache.
+ RemoteProjectStore::Result R =
+ DoImport(nullptr, EPartialBlockRequestMode::Off, -1.0, (uint64_t)-1, -1.0, (uint64_t)-1, false, false);
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("mode_all_multirange_cloud_no_cache")
+ {
+ // StoreMaxRangeCountPerRequest > 1 → MultiRange cloud path.
+ RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 128u, -1.0, 0u, false, false);
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("mode_all_singlerange_cloud_no_cache")
+ {
+ // StoreMaxRangeCountPerRequest == 1 → SingleRange cloud path.
+ RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 1u, -1.0, 0u, false, false);
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("mode_mixed_high_latency_no_cache")
+ {
+ // High store latency encourages range merging; Mixed uses SingleRange for cloud, Off for cache.
+ RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::Mixed, 0.1, 128u, -1.0, 0u, false, false);
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("cache_populate_and_hit")
+ {
+ // First import: ImportCidStore is empty so all blocks are downloaded from the remote store
+ // and written to the cache.
+ RemoteProjectStore::Result PopulateResult = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false);
+ CHECK(PopulateResult.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount > 0);
+
+ // Re-import with ForceDownload=true: all chunks are now in ImportCidStore but Force overrides
+ // HasAttachment() so the download logic re-runs and serves blocks from the cache instead of
+ // the remote store.
+ ResetCacheStats();
+ RemoteProjectStore::Result HitResult = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true);
+ CHECK(HitResult.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount == 0);
+ // TotalRequestCount covers both full-blob cache hits and partial-range cache hits.
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("cache_no_populate_flag")
+ {
+ // Cache is provided but PopulateCache=false: blocks are downloaded to ImportCidStore but
+ // nothing should be written to the cache.
+ RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/false);
+ CHECK(R.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount == 0);
+ }
+
+ SUBCASE("mode_zencacheonly_cache_multirange")
+ {
+ // Pre-populate the cache via a plain import, then re-import with ZenCacheOnly +
+ // CacheMaxRangeCountPerRequest=128. With 100% of chunks needed, all blocks go to
+ // FullBlockIndexes and GetBuildBlob (full blob) is called from the cache.
+ // CacheMaxRangeCountPerRequest > 1 would route partial downloads through GetBuildBlobRanges
+ // if the analyser ever emits BlockRanges entries.
+ RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false);
+ CHECK(Populate.ErrorCode == 0);
+ ResetCacheStats();
+
+ RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 128u, false, true);
+ CHECK(R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("mode_zencacheonly_cache_singlerange")
+ {
+ // Pre-populate the cache, then re-import with ZenCacheOnly + CacheMaxRangeCountPerRequest=1.
+ // With 100% of chunks needed the analyser sends all blocks to FullBlockIndexes (full-block
+ // download path), which calls GetBuildBlob with no range offset — a full-blob cache hit.
+ // The single-range vs multi-range distinction only matters for the partial-block (BlockRanges)
+ // path, which is not reached when all chunks are needed.
+ RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false);
+ CHECK(Populate.ErrorCode == 0);
+ ResetCacheStats();
+
+ RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 1u, false, true);
+ CHECK(R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("mode_all_cache_and_cloud_multirange")
+ {
+ // Pre-populate cache; All mode uses multi-range for both the cache and cloud paths.
+ RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false);
+ CHECK(Populate.ErrorCode == 0);
+ ResetCacheStats();
+
+ RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true);
+ CHECK(R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("partial_block_cloud_multirange")
+ {
+ // Export store with 6 × 512 KB chunks packed into one ~3 MB block.
+ ScopedTemporaryDirectory PartialExportDir;
+ std::shared_ptr<RemoteProjectStore> PartialRemoteStore;
+ RemoteProjectStore::Result ExportR =
+ SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore);
+ REQUIRE(ExportR.ErrorCode == 0);
+
+ // Seeding even-indexed chunks (0, 2, 4) leaves odd ones (1, 3, 5) absent in
+ // ImportCidStore. Three non-adjacent needed positions → three BlockRangeDescriptors.
+ IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u);
+ CHECK(BlockHash != IoHash::Zero);
+ SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash);
+
+ // StoreMaxRangeCountPerRequest=128 → all three ranges sent in one LoadAttachmentRanges call.
+ Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *PartialOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All,
+ .PopulateCache = false,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = -1.0,
+ .CacheMaxRangeCountPerRequest = 0u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("partial_block_cloud_singlerange")
+ {
+ // Same block layout as partial_block_cloud_multirange but StoreMaxRangeCountPerRequest=1.
+ // DownloadPartialBlock issues one LoadAttachmentRanges call per range.
+ ScopedTemporaryDirectory PartialExportDir;
+ std::shared_ptr<RemoteProjectStore> PartialRemoteStore;
+ RemoteProjectStore::Result ExportR =
+ SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore);
+ REQUIRE(ExportR.ErrorCode == 0);
+
+ IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u);
+ CHECK(BlockHash != IoHash::Zero);
+ SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash);
+
+ Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *PartialOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All,
+ .PopulateCache = false,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 1u,
+ .CacheLatencySec = -1.0,
+ .CacheMaxRangeCountPerRequest = 0u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("partial_block_cache_multirange")
+ {
+ ScopedTemporaryDirectory PartialExportDir;
+ std::shared_ptr<RemoteProjectStore> PartialRemoteStore;
+ RemoteProjectStore::Result ExportR =
+ SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore);
+ REQUIRE(ExportR.ErrorCode == 0);
+
+ IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u);
+ CHECK(BlockHash != IoHash::Zero);
+
+ // Phase 1: ImportCidStore starts empty → full block download from remote → PutBuildBlob
+ // populates the cache.
+ {
+ Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = Cache.get(),
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *Phase1Oplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All,
+ .PopulateCache = true,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = 0.001,
+ .CacheMaxRangeCountPerRequest = 128u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(Phase1R.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount > 0);
+ }
+ ResetCacheStats();
+
+ // Phase 2: fresh CidStore with only even-indexed chunks seeded.
+ // HasAttachment returns false for odd chunks (1, 3, 5) → three BlockRangeDescriptors.
+ // Block is in cache from Phase 1 → cache partial path.
+ // CacheMaxRangeCountPerRequest=128 → SubRangeCount=3 > 1 → GetBuildBlobRanges.
+ GcManager Phase2Gc;
+ CidStore Phase2CidStore(Phase2Gc);
+ CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ Phase2CidStore.Initialize(Phase2CidConfig);
+ SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash);
+
+ Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = Cache.get(),
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *Phase2Oplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly,
+ .PopulateCache = false,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = 0.001,
+ .CacheMaxRangeCountPerRequest = 128u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(Phase2R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("partial_block_cache_singlerange")
+ {
+ ScopedTemporaryDirectory PartialExportDir;
+ std::shared_ptr<RemoteProjectStore> PartialRemoteStore;
+ RemoteProjectStore::Result ExportR =
+ SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore);
+ REQUIRE(ExportR.ErrorCode == 0);
+
+ IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u);
+ CHECK(BlockHash != IoHash::Zero);
+
+ // Phase 1: full block download from remote into cache.
+ {
+ Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = Cache.get(),
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *Phase1Oplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All,
+ .PopulateCache = true,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = 0.001,
+ .CacheMaxRangeCountPerRequest = 128u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(Phase1R.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount > 0);
+ }
+ ResetCacheStats();
+
+ // Phase 2: fresh CidStore with only even-indexed chunks seeded.
+ // CacheMaxRangeCountPerRequest=1 → SubRangeCount=Min(3,1)=1 → GetBuildBlob with range
+ // offset (single-range legacy cache path), called once per needed chunk range.
+ GcManager Phase2Gc;
+ CidStore Phase2CidStore(Phase2Gc);
+ CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas_single",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ Phase2CidStore.Initialize(Phase2CidConfig);
+ SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash);
+
+ Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = Cache.get(),
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *Phase2Oplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly,
+ .PopulateCache = false,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = 0.001,
+ .CacheMaxRangeCountPerRequest = 1u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(Phase2R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+}
+
+TEST_SUITE_END();
+
#endif // ZEN_WITH_TESTS
void