diff options
Diffstat (limited to 'src')
3 files changed, 578 insertions, 609 deletions
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h index 88e756439..8df892053 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h @@ -242,7 +242,7 @@ struct LoadOplogContext JobContext* OptionalJobContext = nullptr; }; -RemoteProjectStore::Result LoadOplog(LoadOplogContext&& Context); +void LoadOplog(LoadOplogContext&& Context); std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 2cdc76034..55f40d223 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -3798,7 +3798,7 @@ SaveOplogContainer( return Result; } -RemoteProjectStore::Result +void LoadOplog(LoadOplogContext&& Context) { using namespace std::literals; @@ -3825,10 +3825,10 @@ LoadOplog(LoadOplogContext&& Context) remotestore_impl::ReportMessage( Context.OptionalJobContext, fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); - return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = LoadContainerResult.Reason, - .Text = LoadContainerResult.Text}; + throw RemoteStoreError( + fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode), + LoadContainerResult.ErrorCode, + LoadContainerResult.Text); } remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loaded container in {} ({})", @@ -4158,56 +4158,44 @@ LoadOplog(LoadOplogContext&& Context) } uint64_t TotalChunksToDownload = AllNeededChunkHashes.size() + AttachmentsToDownload.size(); - try - { - AttachmentWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t /*Pending*/) { - if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag) - { - AbortFlag = true; - } - uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; - if (DownloadStartMS != (uint64_t)-1) - { - PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); - } - - uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load(); - - uint64_t AttachmentsDownloaded = - Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); - uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + - Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); - - remotestore_impl::ReportProgress( - Context.OptionalJobContext, - "Loading attachments"sv, - fmt::format( - "{}/{} ({}) chunks. {} ({}) blobs downloaded. {}", - CompletedChunkCount, - TotalChunksToDownload, - NiceBytes(Info.AttachmentBytesStored.load()), - AttachmentsDownloaded, - NiceBytes(AttachmentBytesDownloaded), - remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)), - TotalChunksToDownload, - TotalChunksToDownload - CompletedChunkCount, - AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); - }); - } - catch (const RemoteStoreError& Ex) - { - if (!LoadResult.ErrorCode) + AttachmentWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t /*Pending*/) { + if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag) { - LoadResult = {.ErrorCode = Ex.GetErrorCode(), .Reason = Ex.what(), .Text = std::string(Ex.GetText())}; + AbortFlag = true; } - } - catch (const std::exception& Ex) - { - if (!LoadResult.ErrorCode) + uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; + if (DownloadStartMS != (uint64_t)-1) { - LoadResult = {.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .Reason = Ex.what()}; + PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } + + uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load(); + + uint64_t AttachmentsDownloaded = + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); + uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() + + Info.AttachmentBytesDownloaded.load(); + + remotestore_impl::ReportProgress( + Context.OptionalJobContext, + "Loading attachments"sv, + fmt::format("{}/{} ({}) chunks. {} ({}) blobs downloaded. {}", + CompletedChunkCount, + TotalChunksToDownload, + NiceBytes(Info.AttachmentBytesStored.load()), + AttachmentsDownloaded, + NiceBytes(AttachmentBytesDownloaded), + remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)), + TotalChunksToDownload, + TotalChunksToDownload - CompletedChunkCount, + AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); + }); + + if (LoadResult.ErrorCode) + { + throw RemoteStoreError(LoadResult.Reason, LoadResult.ErrorCode, LoadResult.Text); } + if (DownloadStartMS != (uint64_t)-1) { TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); @@ -4222,77 +4210,139 @@ LoadOplog(LoadOplogContext&& Context) 0, AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); } - if (LoadResult.ErrorCode == 0) + if (!FilesToDechunk.empty()) { - if (!FilesToDechunk.empty()) - { - remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); - ParallelWork DechunkWork(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - std::filesystem::path TempFilePath = Context.Oplog.TempPath(); - for (size_t ChunkedIndex = 0; ChunkedIndex < FilesToDechunk.size(); ChunkedIndex++) - { - const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex]; - std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); - DechunkWork.ScheduleWork( - Context.WorkerPool, - [&Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic<bool>& AbortFlag) { - ZEN_TRACE_CPU("DechunkAttachment"); - - auto _ = MakeGuard([&TempFileName] { - std::error_code Ec; - if (IsFile(TempFileName, Ec)) + ParallelWork DechunkWork(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::filesystem::path TempFilePath = Context.Oplog.TempPath(); + for (size_t ChunkedIndex = 0; ChunkedIndex < FilesToDechunk.size(); ChunkedIndex++) + { + const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex]; + std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); + DechunkWork.ScheduleWork( + Context.WorkerPool, + [&Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic<bool>& AbortFlag) { + ZEN_TRACE_CPU("DechunkAttachment"); + + auto _ = MakeGuard([&TempFileName] { + std::error_code Ec; + if (IsFile(TempFileName, Ec)) + { + RemoveFile(TempFileName, Ec); + if (Ec) { - RemoveFile(TempFileName, Ec); - if (Ec) - { - ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); - } + ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); } - }); - const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex]; + } + }); + const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex]; - try + try + { + if (AbortFlag.load()) + { + return; + } + Stopwatch Timer; + + IoBuffer TmpBuffer; { - if (AbortFlag.load()) + BasicFile TmpFile; + std::error_code Ec; + TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate, Ec); + if (Ec) { - return; + throw RemoteStoreError( + "Write error", + gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to open temp file {} for chunked attachment {}", TempFileName, Chunked.RawHash)); } - Stopwatch Timer; - - IoBuffer TmpBuffer; + else { - BasicFile TmpFile; - std::error_code Ec; - TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate, Ec); - if (Ec) - { - throw RemoteStoreError("Write error", - gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to open temp file {} for chunked attachment {}", - TempFileName, - Chunked.RawHash)); - } - else + BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); + + uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); + BLAKE3Stream HashingStream; + for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { - BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); + if (AbortFlag.load()) + { + return; + } - uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); - BLAKE3Stream HashingStream; - for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) + const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; + IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash); + if (!Chunk) { - if (AbortFlag.load()) + remotestore_impl::ReportMessage( + Context.OptionalJobContext, + fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!Context.IgnoreMissingAttachments) { - return; + throw RemoteStoreError( + "Missing chunk", + gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); } + return; + } + + IoHash RawHash; + uint64_t RawSize; - const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; - IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash); - if (!Chunk) + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); + if (RawHash != ChunkHash || !Compressed) + { + std::string Message = + Compressed ? fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", + RawHash, + ChunkHash, + Chunked.RawHash) + : fmt::format("Malformed data for chunk {} for chunked attachment {}", + ChunkHash, + Chunked.RawHash); + remotestore_impl::ReportMessage(Context.OptionalJobContext, Message); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!Context.IgnoreMissingAttachments) + { + throw RemoteStoreError("Missing chunk", gsl::narrow<int>(HttpResponseCode::NotFound), Message); + } + return; + } + + { + ZEN_TRACE_CPU("DecompressChunk"); + + if (!Compressed.DecompressToStream( + 0, + RawSize, + [&](uint64_t SourceOffset, + uint64_t SourceSize, + uint64_t Offset, + const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + MemoryView SegmentData = Segment.GetView(); + HashingStream.Append(SegmentData); + TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), ChunkOffset + Offset); + } + return true; + })) { remotestore_impl::ReportMessage( Context.OptionalJobContext, - fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + fmt::format("Failed to decompress chunk {} for chunked attachment {}", + ChunkHash, + Chunked.RawHash)); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); @@ -4301,189 +4351,96 @@ LoadOplog(LoadOplogContext&& Context) throw RemoteStoreError( "Missing chunk", gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); - } - return; - } - - IoHash RawHash; - uint64_t RawSize; - - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); - if (RawHash != ChunkHash || !Compressed) - { - std::string Message = - Compressed ? fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", - RawHash, - ChunkHash, - Chunked.RawHash) - : fmt::format("Malformed data for chunk {} for chunked attachment {}", - ChunkHash, - Chunked.RawHash); - remotestore_impl::ReportMessage(Context.OptionalJobContext, Message); - - // We only add 1 as the resulting missing count will be 1 for the dechunked file - Info.MissingAttachmentCount.fetch_add(1); - if (!Context.IgnoreMissingAttachments) - { - throw RemoteStoreError("Missing chunk", - gsl::narrow<int>(HttpResponseCode::NotFound), - Message); - } - return; - } - - { - ZEN_TRACE_CPU("DecompressChunk"); - - if (!Compressed.DecompressToStream(0, - RawSize, - [&](uint64_t SourceOffset, - uint64_t SourceSize, - uint64_t Offset, - const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset, SourceSize, Offset); - - for (const SharedBuffer& Segment : - RangeBuffer.GetSegments()) - { - MemoryView SegmentData = Segment.GetView(); - HashingStream.Append(SegmentData); - TmpWriter.Write(SegmentData.GetData(), - SegmentData.GetSize(), - ChunkOffset + Offset); - } - return true; - })) - { - remotestore_impl::ReportMessage( - Context.OptionalJobContext, fmt::format("Failed to decompress chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); - - // We only add 1 as the resulting missing count will be 1 for the dechunked file - Info.MissingAttachmentCount.fetch_add(1); - if (!Context.IgnoreMissingAttachments) - { - throw RemoteStoreError( - "Missing chunk", - gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to decompress chunk {} for chunked attachment {}", - ChunkHash, - Chunked.RawHash)); - } - return; } + return; } - ChunkOffset += RawSize; } - BLAKE3 RawHash = HashingStream.GetHash(); - ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); - UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); - TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); + ChunkOffset += RawSize; } - TmpFile.Close(); - TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); - } - uint64_t TmpBufferSize = TmpBuffer.GetSize(); - CidStore::InsertResult InsertResult = - Context.ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(TmpBufferSize); - Info.AttachmentsStored.fetch_add(1); + BLAKE3 RawHash = HashingStream.GetHash(); + ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); + UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); + TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); } - - ZEN_INFO("Dechunked attachment {} ({}) in {}", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + TmpFile.Close(); + TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); } - catch (const RemoteStoreError&) + uint64_t TmpBufferSize = TmpBuffer.GetSize(); + CidStore::InsertResult InsertResult = + Context.ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + if (InsertResult.New) { - throw; + Info.AttachmentBytesStored.fetch_add(TmpBufferSize); + Info.AttachmentsStored.fetch_add(1); } - catch (const std::exception& Ex) - { - throw RemoteStoreError(fmt::format("Failed to dechunk file {}", Chunked.RawHash), - gsl::narrow<int>(HttpResponseCode::InternalServerError), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - Stopwatch DechunkProgressTimer; - try - { - DechunkWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t Remaining) { - if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag) + ZEN_INFO("Dechunked attachment {} ({}) in {}", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + catch (const RemoteStoreError&) { - AbortFlag = true; + throw; } - remotestore_impl::ReportProgress(Context.OptionalJobContext, - "Dechunking attachments"sv, - fmt::format("{} remaining...", Remaining), - FilesToDechunk.size(), - Remaining, - DechunkProgressTimer.GetElapsedTimeMs()); - }); - } - catch (const RemoteStoreError& Ex) - { - if (!LoadResult.ErrorCode) - { - LoadResult = {.ErrorCode = Ex.GetErrorCode(), .Reason = Ex.what(), .Text = std::string(Ex.GetText())}; - } - } - catch (const std::exception& Ex) + catch (const std::exception& Ex) + { + throw RemoteStoreError(fmt::format("Failed to dechunk file {}", Chunked.RawHash), + gsl::narrow<int>(HttpResponseCode::InternalServerError), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + + Stopwatch DechunkProgressTimer; + DechunkWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t Remaining) { + if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag) { - if (!LoadResult.ErrorCode) - { - LoadResult = {.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .Reason = Ex.what()}; - } + AbortFlag = true; } remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, - ""sv, + fmt::format("{} remaining...", Remaining), FilesToDechunk.size(), - 0, + Remaining, DechunkProgressTimer.GetElapsedTimeMs()); - } + }); + remotestore_impl::ReportProgress(Context.OptionalJobContext, + "Dechunking attachments"sv, + ""sv, + FilesToDechunk.size(), + 0, + DechunkProgressTimer.GetElapsedTimeMs()); } - Result = LoadResult; - - if (Result.ErrorCode == 0) + if (Context.CleanOplog) { - if (Context.CleanOplog) + if (Context.OptionalCache) { - if (Context.OptionalCache) - { - Context.OptionalCache->Flush(100, [](intptr_t) { return /*DontWaitForPendingOperation*/ false; }); - } - if (!Context.Oplog.Reset()) - { - Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = fmt::format("Failed to clean existing oplog '{}'", Context.Oplog.OplogId())}; - remotestore_impl::ReportMessage(Context.OptionalJobContext, - fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); - } + Context.OptionalCache->Flush(100, [](intptr_t) { return /*DontWaitForPendingOperation*/ false; }); } - if (Result.ErrorCode == 0) + if (!Context.Oplog.Reset()) { - Result = remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext); - if (Result.ErrorCode) - { - remotestore_impl::ReportMessage(Context.OptionalJobContext, - fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); - } + std::string Reason = fmt::format("Failed to clean existing oplog '{}'", Context.Oplog.OplogId()); + remotestore_impl::ReportMessage( + Context.OptionalJobContext, + fmt::format("Aborting ({}): {}", gsl::narrow<int>(HttpResponseCode::InternalServerError), Reason)); + throw RemoteStoreError(Reason, gsl::narrow<int>(HttpResponseCode::InternalServerError), ""); + } + } + { + RemoteProjectStore::Result WriteResult = + remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext); + if (WriteResult.ErrorCode) + { + remotestore_impl::ReportMessage(Context.OptionalJobContext, + fmt::format("Aborting ({}): {}", WriteResult.ErrorCode, WriteResult.Reason)); + throw RemoteStoreError(WriteResult.Reason, WriteResult.ErrorCode, WriteResult.Text); } } - - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; remotestore_impl::LogRemoteStoreStatsDetails(Context.RemoteStore.GetStats()); @@ -4525,8 +4482,8 @@ LoadOplog(LoadOplogContext&& Context) fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} " "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}", RemoteStoreInfo.ContainerName, - Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + "SUCCESS", + NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())), NiceBytes(Info.OplogSizeBytes), Info.AttachmentBlocksDownloaded.load(), NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), @@ -4540,8 +4497,6 @@ LoadOplog(LoadOplogContext&& Context) NiceBytes(Info.AttachmentBytesStored.load()), Info.MissingAttachmentCount.load(), remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS))); - - return Result; } ChunkedInfo @@ -4891,65 +4846,64 @@ TEST_CASE_TEMPLATE("project.store.export", int OpJobIndex = 0; TestJobContext OpJobContext(OpJobIndex); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = Oid::Zero, - .Oplog = *OplogImport, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &OpJobContext}); - CHECK(ImportResult.ErrorCode == 0); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); + OpJobIndex++; - RemoteProjectStore::Result ImportForceResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = Oid::Zero, - .Oplog = *OplogImport, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = true, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &OpJobContext}); - CHECK(ImportForceResult.ErrorCode == 0); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); + OpJobIndex++; - RemoteProjectStore::Result ImportCleanResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = Oid::Zero, - .Oplog = *OplogImport, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = true, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &OpJobContext}); - CHECK(ImportCleanResult.ErrorCode == 0); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); + OpJobIndex++; - RemoteProjectStore::Result ImportForceCleanResult = - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = Oid::Zero, - .Oplog = *OplogImport, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = true, - .IgnoreMissingAttachments = false, - .CleanOplog = true, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &OpJobContext}); - CHECK(ImportForceCleanResult.ErrorCode == 0); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .CleanOplog = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); + OpJobIndex++; } @@ -5247,75 +5201,68 @@ TEST_CASE("project.store.import.context_settings") double CacheLatency, uint64_t CacheRanges, bool PopulateCache, - bool ForceDownload) -> RemoteProjectStore::Result { + bool ForceDownload) -> void { Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog(fmt::format("import_{}", OpJobIndex++), {}); - return LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = OptCache, - .CacheBuildId = CacheBuildId, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = ForceDownload, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = Mode, - .PopulateCache = PopulateCache, - .StoreLatencySec = StoreLatency, - .StoreMaxRangeCountPerRequest = StoreRanges, - .CacheLatencySec = CacheLatency, - .CacheMaxRangeCountPerRequest = CacheRanges, - .OptionalJobContext = &OpJobContext}); + LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = OptCache, + .CacheBuildId = CacheBuildId, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = ForceDownload, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = Mode, + .PopulateCache = PopulateCache, + .StoreLatencySec = StoreLatency, + .StoreMaxRangeCountPerRequest = StoreRanges, + .CacheLatencySec = CacheLatency, + .CacheMaxRangeCountPerRequest = CacheRanges, + .OptionalJobContext = &OpJobContext}); }; // Shorthand: Mode=All, low latency, 128 ranges for both store and cache. - auto ImportAll = [&](BuildStorageCache* OptCache, bool Populate, bool Force) { - return DoImport(OptCache, EPartialBlockRequestMode::All, 0.001, 128u, 0.001, 128u, Populate, Force); + auto ImportAll = [&](BuildStorageCache* OptCache, bool Populate, bool Force) -> void { + DoImport(OptCache, EPartialBlockRequestMode::All, 0.001, 128u, 0.001, 128u, Populate, Force); }; SUBCASE("mode_off_no_cache") { // Baseline: no partial block requests, no cache. - RemoteProjectStore::Result R = - DoImport(nullptr, EPartialBlockRequestMode::Off, -1.0, (uint64_t)-1, -1.0, (uint64_t)-1, false, false); - CHECK(R.ErrorCode == 0); + DoImport(nullptr, EPartialBlockRequestMode::Off, -1.0, (uint64_t)-1, -1.0, (uint64_t)-1, false, false); } SUBCASE("mode_all_multirange_cloud_no_cache") { // StoreMaxRangeCountPerRequest > 1 → MultiRange cloud path. - RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 128u, -1.0, 0u, false, false); - CHECK(R.ErrorCode == 0); + DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 128u, -1.0, 0u, false, false); } SUBCASE("mode_all_singlerange_cloud_no_cache") { // StoreMaxRangeCountPerRequest == 1 → SingleRange cloud path. - RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 1u, -1.0, 0u, false, false); - CHECK(R.ErrorCode == 0); + DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 1u, -1.0, 0u, false, false); } SUBCASE("mode_mixed_high_latency_no_cache") { // High store latency encourages range merging; Mixed uses SingleRange for cloud, Off for cache. - RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::Mixed, 0.1, 128u, -1.0, 0u, false, false); - CHECK(R.ErrorCode == 0); + DoImport(nullptr, EPartialBlockRequestMode::Mixed, 0.1, 128u, -1.0, 0u, false, false); } SUBCASE("cache_populate_and_hit") { // First import: ImportCidStore is empty so all blocks are downloaded from the remote store // and written to the cache. - RemoteProjectStore::Result PopulateResult = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); - CHECK(PopulateResult.ErrorCode == 0); + ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); CHECK(CacheStats.PutBlobCount > 0); // Re-import with ForceDownload=true: all chunks are now in ImportCidStore but Force overrides // HasAttachment() so the download logic re-runs and serves blocks from the cache instead of // the remote store. ResetCacheStats(); - RemoteProjectStore::Result HitResult = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); - CHECK(HitResult.ErrorCode == 0); + ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); CHECK(CacheStats.PutBlobCount == 0); // TotalRequestCount covers both full-blob cache hits and partial-range cache hits. CHECK(CacheStats.TotalRequestCount > 0); @@ -5325,8 +5272,7 @@ TEST_CASE("project.store.import.context_settings") { // Cache is provided but PopulateCache=false: blocks are downloaded to ImportCidStore but // nothing should be written to the cache. - RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/false); - CHECK(R.ErrorCode == 0); + ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/false); CHECK(CacheStats.PutBlobCount == 0); } @@ -5337,12 +5283,10 @@ TEST_CASE("project.store.import.context_settings") // FullBlockIndexes and GetBuildBlob (full blob) is called from the cache. // CacheMaxRangeCountPerRequest > 1 would route partial downloads through GetBuildBlobRanges // if the analyser ever emits BlockRanges entries. - RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); - CHECK(Populate.ErrorCode == 0); + ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); ResetCacheStats(); - RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 128u, false, true); - CHECK(R.ErrorCode == 0); + DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 128u, false, true); CHECK(CacheStats.TotalRequestCount > 0); } @@ -5353,24 +5297,20 @@ TEST_CASE("project.store.import.context_settings") // download path), which calls GetBuildBlob with no range offset — a full-blob cache hit. // The single-range vs multi-range distinction only matters for the partial-block (BlockRanges) // path, which is not reached when all chunks are needed. - RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); - CHECK(Populate.ErrorCode == 0); + ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); ResetCacheStats(); - RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 1u, false, true); - CHECK(R.ErrorCode == 0); + DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 1u, false, true); CHECK(CacheStats.TotalRequestCount > 0); } SUBCASE("mode_all_cache_and_cloud_multirange") { // Pre-populate cache; All mode uses multi-range for both the cache and cloud paths. - RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); - CHECK(Populate.ErrorCode == 0); + ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); ResetCacheStats(); - RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); - CHECK(R.ErrorCode == 0); + ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); CHECK(CacheStats.TotalRequestCount > 0); } @@ -5388,25 +5328,24 @@ TEST_CASE("project.store.import.context_settings") SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); // StoreMaxRangeCountPerRequest=128 → all three ranges sent in one LoadAttachmentRanges call. - Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {}); - RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, - .RemoteStore = *PartialRemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = CacheBuildId, - .Oplog = *PartialOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::All, - .PopulateCache = false, - .StoreLatencySec = 0.001, - .StoreMaxRangeCountPerRequest = 128u, - .CacheLatencySec = -1.0, - .CacheMaxRangeCountPerRequest = 0u, - .OptionalJobContext = &OpJobContext}); - CHECK(R.ErrorCode == 0); + Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {}); + LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = CacheBuildId, + .Oplog = *PartialOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = -1.0, + .CacheMaxRangeCountPerRequest = 0u, + .OptionalJobContext = &OpJobContext}); } SUBCASE("partial_block_cloud_singlerange") @@ -5421,25 +5360,24 @@ TEST_CASE("project.store.import.context_settings") CHECK(BlockHash != IoHash::Zero); SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); - Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {}); - RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, - .RemoteStore = *PartialRemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = CacheBuildId, - .Oplog = *PartialOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::All, - .PopulateCache = false, - .StoreLatencySec = 0.001, - .StoreMaxRangeCountPerRequest = 1u, - .CacheLatencySec = -1.0, - .CacheMaxRangeCountPerRequest = 0u, - .OptionalJobContext = &OpJobContext}); - CHECK(R.ErrorCode == 0); + Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {}); + LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = CacheBuildId, + .Oplog = *PartialOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 1u, + .CacheLatencySec = -1.0, + .CacheMaxRangeCountPerRequest = 0u, + .OptionalJobContext = &OpJobContext}); } SUBCASE("partial_block_cache_multirange") @@ -5454,25 +5392,25 @@ TEST_CASE("project.store.import.context_settings") // Phase 1: ImportCidStore starts empty → full block download from remote → PutBuildBlob // populates the cache. { - Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {}); - RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, - .RemoteStore = *PartialRemoteStore, - .OptionalCache = Cache.get(), - .CacheBuildId = CacheBuildId, - .Oplog = *Phase1Oplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::All, - .PopulateCache = true, - .StoreLatencySec = 0.001, - .StoreMaxRangeCountPerRequest = 128u, - .CacheLatencySec = 0.001, - .CacheMaxRangeCountPerRequest = 128u, - .OptionalJobContext = &OpJobContext}); - CHECK(Phase1R.ErrorCode == 0); + Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {}); + LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase1Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = true, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(CacheStats.PutBlobCount > 0); } ResetCacheStats(); @@ -5489,25 +5427,25 @@ TEST_CASE("project.store.import.context_settings") Phase2CidStore.Initialize(Phase2CidConfig); SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); - Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {}); - RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, - .RemoteStore = *PartialRemoteStore, - .OptionalCache = Cache.get(), - .CacheBuildId = CacheBuildId, - .Oplog = *Phase2Oplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, - .PopulateCache = false, - .StoreLatencySec = 0.001, - .StoreMaxRangeCountPerRequest = 128u, - .CacheLatencySec = 0.001, - .CacheMaxRangeCountPerRequest = 128u, - .OptionalJobContext = &OpJobContext}); - CHECK(Phase2R.ErrorCode == 0); + Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {}); + LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase2Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(CacheStats.TotalRequestCount > 0); } @@ -5522,25 +5460,25 @@ TEST_CASE("project.store.import.context_settings") // Phase 1: full block download from remote into cache. { - Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {}); - RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, - .RemoteStore = *PartialRemoteStore, - .OptionalCache = Cache.get(), - .CacheBuildId = CacheBuildId, - .Oplog = *Phase1Oplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::All, - .PopulateCache = true, - .StoreLatencySec = 0.001, - .StoreMaxRangeCountPerRequest = 128u, - .CacheLatencySec = 0.001, - .CacheMaxRangeCountPerRequest = 128u, - .OptionalJobContext = &OpJobContext}); - CHECK(Phase1R.ErrorCode == 0); + Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {}); + LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase1Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = true, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(CacheStats.PutBlobCount > 0); } ResetCacheStats(); @@ -5556,25 +5494,25 @@ TEST_CASE("project.store.import.context_settings") Phase2CidStore.Initialize(Phase2CidConfig); SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); - Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {}); - RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, - .RemoteStore = *PartialRemoteStore, - .OptionalCache = Cache.get(), - .CacheBuildId = CacheBuildId, - .Oplog = *Phase2Oplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, - .PopulateCache = false, - .StoreLatencySec = 0.001, - .StoreMaxRangeCountPerRequest = 128u, - .CacheLatencySec = 0.001, - .CacheMaxRangeCountPerRequest = 1u, - .OptionalJobContext = &OpJobContext}); - CHECK(Phase2R.ErrorCode == 0); + Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {}); + LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase2Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 1u, + .OptionalJobContext = &OpJobContext}); + CHECK(CacheStats.TotalRequestCount > 0); } } @@ -5750,15 +5688,14 @@ TEST_CASE("project.store.embed_loose_files_true") /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_true_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - CHECK(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_true_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.embed_loose_files_false") @@ -5807,15 +5744,14 @@ TEST_CASE("project.store.embed_loose_files_false") /*ForceDisableBlocks=*/false, &RemoteStore); - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_false_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - CHECK(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_false_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.missing_attachment_ignored") @@ -5983,15 +5919,14 @@ TEST_CASE("project.store.export.large_file_attachment_direct") /*ForceDisableBlocks=*/false, &RemoteStore); - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_direct_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - CHECK(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_direct_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.large_file_attachment_via_temp") @@ -6046,15 +5981,14 @@ TEST_CASE("project.store.export.large_file_attachment_via_temp") /*ForceDisableBlocks=*/false, &RemoteStore); - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_via_temp_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - CHECK(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_via_temp_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.large_chunk_from_cidstore") @@ -6105,15 +6039,14 @@ TEST_CASE("project.store.export.large_chunk_from_cidstore") /*ForceDisableBlocks=*/false, &RemoteStore); - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_cid_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - CHECK(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_cid_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.block_reuse") @@ -6265,15 +6198,14 @@ TEST_CASE("project.store.export.max_chunks_per_block") CHECK(KnownBlocks.Blocks.size() >= 2); // Round-trip. - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_max_chunks_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - CHECK(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_max_chunks_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.max_data_per_block") @@ -6356,15 +6288,14 @@ TEST_CASE("project.store.export.max_data_per_block") CHECK(KnownBlocks.Blocks.size() >= 2); // Round-trip. - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_max_data_per_block_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - CHECK(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_max_data_per_block_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.file_deleted_between_phases") @@ -6500,15 +6431,14 @@ TEST_CASE("project.store.embed_loose_files_zero_data_hash") &RemoteStore); // Round-trip: the resolved attachment data must be recoverable. - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_zero_data_hash_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - CHECK(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_zero_data_hash_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.embed_loose_files_already_resolved") @@ -6564,15 +6494,14 @@ TEST_CASE("project.store.embed_loose_files_already_resolved") // Import: WriteOplogSection copies the rewritten ops (with pre-resolved "data" // BinaryAttachment fields) into the local oplog; attachment data lands in CidStore. - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_already_resolved_import", {}); - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore1, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); - REQUIRE(ImportResult.ErrorCode == 0); + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_already_resolved_import", {}); + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore1, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); // Re-export: the imported oplog ops carry "data": BinaryAttachment(H) (DataHash != Zero). // RewriteOp copies those entries via line 1873 without re-reading files from disk. @@ -6656,17 +6585,17 @@ TEST_CASE("project.store.import.error.missing_attachment") Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_import", {}); REQUIRE(ImportOplog); - CapturingJobContext Ctx; - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = true, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &Ctx}); - CHECK(ImportResult.ErrorCode != 0); + CapturingJobContext Ctx; + CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx}), + RemoteStoreError); } TEST_CASE("project.store.import.ignore_missing_attachment") @@ -6731,17 +6660,64 @@ TEST_CASE("project.store.import.ignore_missing_attachment") Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_ignore_missing_import", {}); REQUIRE(ImportOplog); - CapturingJobContext Ctx; - RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = true, - .IgnoreMissingAttachments = true, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &Ctx}); - CHECK(ImportResult.ErrorCode == 0); + CapturingJobContext Ctx; + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx}); +} + +TEST_CASE("project.store.import.error.load_container_failure") +{ + // Point a FileRemoteProjectStore at a nonexistent directory so that + // LoadContainer() returns a non-zero ErrorCode, then verify that + // LoadOplog throws a RemoteStoreError. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + // Point to a path that does not contain a valid oplog container. + std::filesystem::path NonExistentPath = TempDir.Path() / "does_not_exist"; + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024u, + .MaxChunksPerBlock = 1000, + .MaxChunkEmbedSize = 32u * 1024u, + .ChunkFileSizeLimit = 64u * 1024u * 1024u}, + /*.FolderPath =*/NonExistentPath, + /*.Name =*/"load_container_failure", + /*.OptionalBaseName =*/std::string(), + /*.ForceDisableBlocks =*/false, + /*.ForceEnableTempBlocks =*/false}; + std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options); + + uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); + uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; + WorkerThreadPool WorkerPool(WorkerCount); + WorkerThreadPool NetworkPool(NetworkWorkerCount); + + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("load_container_failure_import", {}); + REQUIRE(ImportOplog); + + CapturingJobContext Ctx; + CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx}), + RemoteStoreError); } TEST_CASE("project.store.blockcomposer.path_a_standalone_block") diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp index 8c34e09d7..99dc5717d 100644 --- a/src/zenserver/storage/projectstore/httpprojectstore.cpp +++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp @@ -2869,34 +2869,27 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) try { - RemoteProjectStore::Result Result = LoadOplog(LoadOplogContext{ - .ChunkStore = m_CidStore, - .RemoteStore = *RemoteStoreResult->Store, - .OptionalCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Cache.get() : nullptr, - .CacheBuildId = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->BuildsId : Oid::Zero, - .OptionalCacheStats = RemoteStoreResult->OptionalCache ? &RemoteStoreResult->OptionalCache->Stats : nullptr, - .Oplog = *Oplog, - .NetworkWorkerPool = NetworkWorkerPool, - .WorkerPool = WorkerPool, - .ForceDownload = Force, - .IgnoreMissingAttachments = IgnoreMissingAttachments, - .CleanOplog = CleanOplog, - .PartialBlockRequestMode = PartialBlockRequestMode, - .PopulateCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Populate : false, - .StoreLatencySec = RemoteStoreResult->LatencySec, - .StoreMaxRangeCountPerRequest = RemoteStoreResult->MaxRangeCountPerRequest, - .CacheLatencySec = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->LatencySec : -1.0, - .CacheMaxRangeCountPerRequest = - RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->MaxRangeCountPerRequest : 0, - .OptionalJobContext = &Context}); - auto Response = ConvertResult(Result); - ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw JobError( - Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, - (int)Response.first); - } + LoadOplog(LoadOplogContext{ + .ChunkStore = m_CidStore, + .RemoteStore = *RemoteStoreResult->Store, + .OptionalCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Cache.get() : nullptr, + .CacheBuildId = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->BuildsId : Oid::Zero, + .OptionalCacheStats = RemoteStoreResult->OptionalCache ? &RemoteStoreResult->OptionalCache->Stats : nullptr, + .Oplog = *Oplog, + .NetworkWorkerPool = NetworkWorkerPool, + .WorkerPool = WorkerPool, + .ForceDownload = Force, + .IgnoreMissingAttachments = IgnoreMissingAttachments, + .CleanOplog = CleanOplog, + .PartialBlockRequestMode = PartialBlockRequestMode, + .PopulateCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Populate : false, + .StoreLatencySec = RemoteStoreResult->LatencySec, + .StoreMaxRangeCountPerRequest = RemoteStoreResult->MaxRangeCountPerRequest, + .CacheLatencySec = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->LatencySec : -1.0, + .CacheMaxRangeCountPerRequest = + RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->MaxRangeCountPerRequest : 0, + .OptionalJobContext = &Context}); + ZEN_INFO("LoadOplog: Complete"); } catch (const HttpClientError& HttpEx) { |