diff options
Diffstat (limited to 'src/zenremotestore/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 441 |
1 files changed, 373 insertions, 68 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 1a9dc10ef..f43f0813a 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -8,6 +8,8 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/logging/broadcastsink.h> +#include <zencore/logging/logger.h> #include <zencore/parallelwork.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> @@ -18,8 +20,9 @@ #include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/chunking/chunkedcontent.h> #include <zenremotestore/chunking/chunkedfile.h> -#include <zenremotestore/operationlogoutput.h> #include <zenstore/cidstore.h> +#include <zenutil/logging.h> +#include <zenutil/progress.h> #include <numeric> #include <unordered_map> @@ -392,7 +395,10 @@ namespace remotestore_impl { OodleCompressor Compressor, OodleCompressionLevel CompressionLevel) { - ZEN_ASSERT(!IsFile(AttachmentPath)); + if (IsFile(AttachmentPath)) + { + ZEN_WARN("Temp attachment file already exists at '{}', truncating", AttachmentPath); + } BasicFile CompressedFile; std::error_code Ec; CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec); @@ -448,6 +454,7 @@ namespace remotestore_impl { }; CbObject RewriteOplog( + LoggerRef InLog, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, bool IgnoreMissingAttachments, @@ -456,6 +463,7 @@ namespace remotestore_impl { std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher>& UploadAttachments, // TODO: Rename to OutUploadAttachments JobContext* OptionalContext) { + ZEN_SCOPED_LOG(InLog); size_t OpCount = 0; CreateDirectories(AttachmentTempPath); @@ -929,7 +937,6 @@ namespace remotestore_impl { { return; } - ZEN_ASSERT(UploadAttachment->Size != 0); if (!UploadAttachment->RawPath.empty()) { if (UploadAttachment->Size > (MaxChunkEmbedSize * 2)) @@ -1140,31 +1147,51 @@ namespace remotestore_impl { std::atomic<uint64_t> ChunksCompleteCount = 0; }; - class JobContextLogOutput : public OperationLogOutput + class JobContextSink : public logging::Sink { public: - JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {} - virtual void EmitLogMessage(const logging::LogPoint& Point, fmt::format_args Args) override + explicit JobContextSink(JobContext* Context) : m_Context(Context) {} + + void Log(const logging::LogMessage& Msg) override { - if (m_OptionalContext) + if (m_Context) { - fmt::basic_memory_buffer<char, 250> MessageBuffer; - fmt::vformat_to(fmt::appender(MessageBuffer), Point.FormatString, Args); - remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size())); + m_Context->ReportMessage(Msg.GetPayload()); } } - virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); } - virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); } - virtual uint32_t GetProgressUpdateDelayMS() override { return 0; } - virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override + void Flush() override {} + void SetFormatter(std::unique_ptr<logging::Formatter>) override {} + + private: + JobContext* m_Context; + }; + + class JobContextLogger + { + public: + explicit JobContextLogger(JobContext* OptionalContext) { - ZEN_UNUSED(InSubTask); - return nullptr; + if (!OptionalContext) + { + return; + } + logging::SinkPtr ContextSink(new JobContextSink(OptionalContext)); + Ref<logging::BroadcastSink> DefaultSink = GetDefaultBroadcastSink(); + std::vector<logging::SinkPtr> Sinks; + if (DefaultSink) + { + Sinks.push_back(DefaultSink); + } + Sinks.push_back(std::move(ContextSink)); + Ref<logging::BroadcastSink> Broadcast(new logging::BroadcastSink(std::move(Sinks))); + m_Log = Ref<logging::Logger>(new logging::Logger("jobcontext", Broadcast)); } + LoggerRef Log() const { return m_Log ? LoggerRef(*m_Log) : zen::Log(); } + private: - JobContext* m_OptionalContext; + Ref<logging::Logger> m_Log; }; void DownloadAndSaveBlockChunks(LoadOplogContext& Context, @@ -1185,6 +1212,7 @@ namespace remotestore_impl { &LoadAttachmentsTimer, &DownloadStartMS](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DownloadBlockChunks"); + ZEN_SCOPED_LOG(Context.Log); if (AbortFlag) { @@ -1300,6 +1328,7 @@ namespace remotestore_impl { &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DownloadBlock"); + ZEN_SCOPED_LOG(Context.Log); if (AbortFlag) { @@ -1366,6 +1395,7 @@ namespace remotestore_impl { &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, Bytes = std::move(BlobBuffer)](std::atomic<bool>& AbortFlag) { + ZEN_SCOPED_LOG(Context.Log); if (AbortFlag) { return; @@ -1715,6 +1745,7 @@ namespace remotestore_impl { ChunkDownloadedFlags, RetriesLeft](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DownloadBlockRanges"); + ZEN_SCOPED_LOG(Context.Log); try { uint64_t Unset = (std::uint64_t)-1; @@ -1760,6 +1791,7 @@ namespace remotestore_impl { OffsetAndLengths = std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]( std::atomic<bool>& AbortFlag) { + ZEN_SCOPED_LOG(Context.Log); try { ZEN_ASSERT(BlockPayload.Size() > 0); @@ -1972,6 +2004,7 @@ namespace remotestore_impl { Context.NetworkWorkerPool, [&Context, &AttachmentWork, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, &Info](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DownloadAttachment"); + ZEN_SCOPED_LOG(Context.Log); if (AbortFlag) { @@ -2061,7 +2094,8 @@ namespace remotestore_impl { WorkerThreadPool::EMode::EnableBacklog); }; - void AsyncCreateBlock(ParallelWork& Work, + void AsyncCreateBlock(LoggerRef InLog, + ParallelWork& Work, WorkerThreadPool& WorkerPool, std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, RwLock& SectionsLock, @@ -2071,9 +2105,10 @@ namespace remotestore_impl { JobContext* OptionalContext) { Work.ScheduleWork(WorkerPool, - [&Blocks, &SectionsLock, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, OptionalContext]( + [InLog, &Blocks, &SectionsLock, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, OptionalContext]( std::atomic<bool>& AbortFlag) mutable { ZEN_TRACE_CPU("CreateBlock"); + ZEN_SCOPED_LOG(InLog); if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -2452,7 +2487,8 @@ GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> Include } CbObject -BuildContainer(CidStore& ChunkStore, +BuildContainer(LoggerRef InLog, + CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, @@ -2472,7 +2508,8 @@ BuildContainer(CidStore& ChunkStore, { using namespace std::literals; - std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext)); + ZEN_SCOPED_LOG(InLog); + remotestore_impl::JobContextLogger JobContextOutput(OptionalContext); Stopwatch Timer; @@ -2485,7 +2522,8 @@ BuildContainer(CidStore& ChunkStore, size_t TotalOpCount = Oplog.GetOplogEntryCount(); Stopwatch RewriteOplogTimer; - CbObject SectionOps = remotestore_impl::RewriteOplog(Project, + CbObject SectionOps = remotestore_impl::RewriteOplog(InLog, + Project, Oplog, IgnoreMissingAttachments, EmbedLooseFiles, @@ -2605,7 +2643,7 @@ BuildContainer(CidStore& ChunkStore, std::vector<uint32_t> UnusedChunkIndexes; ReuseBlocksStatistics ReuseBlocksStats; - ReusedBlockIndexes = FindReuseBlocks(*LogOutput, + ReusedBlockIndexes = FindReuseBlocks(JobContextOutput.Log(), /*BlockReuseMinPercentLimit*/ 80, /*IsVerbose*/ false, ReuseBlocksStats, @@ -2749,7 +2787,8 @@ BuildContainer(CidStore& ChunkStore, .MaxChunkEmbedSize = MaxChunkEmbedSize, .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }}); - auto OnNewBlock = [&Work, + auto OnNewBlock = [&Log, + &Work, &WorkerPool, BuildBlocks, &BlockCreateProgressTimer, @@ -2774,7 +2813,8 @@ BuildContainer(CidStore& ChunkStore, size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); if (BuildBlocks) { - remotestore_impl::AsyncCreateBlock(Work, + remotestore_impl::AsyncCreateBlock(Log(), + Work, WorkerPool, std::move(ChunksInBlock), BlocksLock, @@ -3007,6 +3047,17 @@ BuildContainer(CidStore& ChunkStore, return {}; } + // Reused blocks were not composed (their chunks were erased from UploadAttachments) but must + // still appear in the container so that a fresh receiver knows to download them. + if (BuildBlocks) + { + for (size_t KnownBlockIndex : ReusedBlockIndexes) + { + const ChunkBlockDescription& Reused = KnownBlocks[KnownBlockIndex]; + Blocks.push_back(Reused); + } + } + CbObjectWriter OplogContainerWriter; RwLock::SharedLockScope _(BlocksLock); OplogContainerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); @@ -3096,7 +3147,8 @@ BuildContainer(CidStore& ChunkStore, } CbObject -BuildContainer(CidStore& ChunkStore, +BuildContainer(LoggerRef InLog, + CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, WorkerThreadPool& WorkerPool, @@ -3112,7 +3164,8 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) { - return BuildContainer(ChunkStore, + return BuildContainer(InLog, + ChunkStore, Project, Oplog, MaxBlockSize, @@ -3132,7 +3185,8 @@ BuildContainer(CidStore& ChunkStore, } void -SaveOplog(CidStore& ChunkStore, +SaveOplog(LoggerRef InLog, + CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, @@ -3149,6 +3203,7 @@ SaveOplog(CidStore& ChunkStore, { using namespace std::literals; + ZEN_SCOPED_LOG(InLog); Stopwatch Timer; remotestore_impl::UploadInfo Info; @@ -3168,8 +3223,8 @@ SaveOplog(CidStore& ChunkStore, std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks; tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; - auto MakeTempBlock = [AttachmentTempPath, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - ChunkBlockDescription&& Block) { + auto MakeTempBlock = [&Log, AttachmentTempPath, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { std::filesystem::path BlockPath = AttachmentTempPath; BlockPath.append(Block.BlockHash.ToHexString()); IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); @@ -3180,8 +3235,8 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockSize)); }; - auto UploadBlock = [&RemoteStore, &RemoteStoreInfo, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, - ChunkBlockDescription&& Block) { + auto UploadBlock = [&Log, &RemoteStore, &RemoteStoreInfo, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { IoHash BlockHash = Block.BlockHash; uint64_t CompressedSize = CompressedBlock.GetCompressedSize(); RemoteProjectStore::SaveAttachmentResult Result = @@ -3201,13 +3256,13 @@ SaveOplog(CidStore& ChunkStore, }; std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>> BlockChunks; - auto OnBlockChunks = [&BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) { + auto OnBlockChunks = [&Log, &BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) { BlockChunks.push_back({std::make_move_iterator(Chunks.begin()), std::make_move_iterator(Chunks.end())}); ZEN_DEBUG("Found {} block chunks", Chunks.size()); }; - auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash, - TGetAttachmentBufferFunc&& GetBufferFunc) { + auto OnLargeAttachment = [&Log, &AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash, + TGetAttachmentBufferFunc&& GetBufferFunc) { { RwLock::ExclusiveLockScope _(AttachmentsLock); LargeAttachments.insert(AttachmentHash); @@ -3286,7 +3341,8 @@ SaveOplog(CidStore& ChunkStore, } } - CbObject OplogContainerObject = BuildContainer(ChunkStore, + CbObject OplogContainerObject = BuildContainer(InLog, + ChunkStore, Project, Oplog, MaxBlockSize, @@ -3694,7 +3750,8 @@ LoadOplog(LoadOplogContext&& Context) { using namespace std::literals; - std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(Context.OptionalJobContext)); + ZEN_SCOPED_LOG(Context.Log); + remotestore_impl::JobContextLogger JobContextOutput(Context.OptionalJobContext); remotestore_impl::DownloadInfo Info; @@ -3985,7 +4042,7 @@ LoadOplog(LoadOplogContext&& Context) ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size()); ChunkBlockAnalyser PartialAnalyser( - *LogOutput, + JobContextOutput.Log(), BlockDescriptions.Blocks, ChunkBlockAnalyser::Options{.IsQuiet = false, .IsVerbose = false, @@ -4108,10 +4165,10 @@ LoadOplog(LoadOplogContext&& Context) std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); DechunkWork.ScheduleWork( Context.WorkerPool, - [&Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic<bool>& AbortFlag) { + [&Log, &Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DechunkAttachment"); - auto _ = MakeGuard([&TempFileName] { + auto _ = MakeGuard([&Log, &TempFileName] { std::error_code Ec; if (IsFile(TempFileName, Ec)) { @@ -4712,7 +4769,8 @@ TEST_CASE_TEMPLATE("project.store.export", WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; - SaveOplog(CidStore, + SaveOplog(Log(), + CidStore, *RemoteStore, *Project.Get(), *Oplog, @@ -4732,7 +4790,8 @@ TEST_CASE_TEMPLATE("project.store.export", CapturingJobContext Ctx; auto DoLoad = [&](bool Force, bool Clean) { - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .OptionalCache = nullptr, .CacheBuildId = Oid::Zero, @@ -4793,7 +4852,8 @@ SetupExportStore(CidStore& CidStore, /*.ForceEnableTempBlocks =*/false}; std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options); - SaveOplog(CidStore, + SaveOplog(Log(), + CidStore, *RemoteStore, Project, *Oplog, @@ -4856,7 +4916,8 @@ SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, WorkerThreadPool& Wo /*.ForceDisableBlocks =*/false, /*.ForceEnableTempBlocks =*/false}; std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options); - SaveOplog(LocalCidStore, + SaveOplog(Log(), + LocalCidStore, *RemoteStore, *LocalProject, *Oplog, @@ -5024,7 +5085,8 @@ TEST_CASE("project.store.import.context_settings") bool PopulateCache, bool ForceDownload) -> void { Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog(fmt::format("import_{}", OpJobIndex++), {}); - LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = ImportCidStore, .RemoteStore = *RemoteStore, .OptionalCache = OptCache, .CacheBuildId = CacheBuildId, @@ -5131,7 +5193,8 @@ TEST_CASE("project.store.import.context_settings") // StoreMaxRangeCountPerRequest=128 -> all three ranges sent in one LoadAttachmentRanges call. Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {}); - LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = ImportCidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = nullptr, .CacheBuildId = CacheBuildId, @@ -5163,7 +5226,8 @@ TEST_CASE("project.store.import.context_settings") SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {}); - LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = ImportCidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = nullptr, .CacheBuildId = CacheBuildId, @@ -5194,7 +5258,8 @@ TEST_CASE("project.store.import.context_settings") // Phase 1: full block download from remote populates the cache. { Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {}); - LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = ImportCidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, @@ -5226,7 +5291,8 @@ TEST_CASE("project.store.import.context_settings") SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {}); - LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = Phase2CidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, @@ -5259,7 +5325,8 @@ TEST_CASE("project.store.import.context_settings") // Phase 1: full block download from remote into cache. { Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {}); - LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = ImportCidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, @@ -5291,7 +5358,8 @@ TEST_CASE("project.store.import.context_settings") SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {}); - LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = Phase2CidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, @@ -5373,7 +5441,8 @@ RunSaveOplog(CidStore& CidStore, { *OutRemoteStore = RemoteStore; } - SaveOplog(CidStore, + SaveOplog(Log(), + CidStore, *RemoteStore, Project, Oplog, @@ -5476,7 +5545,8 @@ TEST_CASE("project.store.embed_loose_files_true") /*ForceDisableBlocks=*/false, &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_true_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -5530,7 +5600,8 @@ TEST_CASE("project.store.embed_loose_files_false" * doctest::skip()) // superse &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_false_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -5693,7 +5764,8 @@ TEST_CASE("project.store.export.large_file_attachment_direct") &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_direct_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -5750,7 +5822,8 @@ TEST_CASE("project.store.export.large_file_attachment_via_temp") &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_via_temp_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -5804,7 +5877,8 @@ TEST_CASE("project.store.export.large_chunk_from_cidstore") &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_cid_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -5867,7 +5941,8 @@ TEST_CASE("project.store.export.block_reuse") BlockHashesAfterFirst.push_back(B.BlockHash); } - SaveOplog(CidStore, + SaveOplog(Log(), + CidStore, *RemoteStore, *Project, *Oplog, @@ -5944,7 +6019,8 @@ TEST_CASE("project.store.export.max_chunks_per_block") CHECK(KnownBlocks.Blocks.size() >= 2); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_max_chunks_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -6027,7 +6103,8 @@ TEST_CASE("project.store.export.max_data_per_block") CHECK(KnownBlocks.Blocks.size() >= 2); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_max_data_per_block_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -6155,7 +6232,8 @@ TEST_CASE("project.store.embed_loose_files_zero_data_hash") &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_zero_data_hash_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -6209,7 +6287,8 @@ TEST_CASE("project.store.embed_loose_files_already_resolved") &RemoteStore1); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_already_resolved_import", {}); - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore1, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -6296,7 +6375,8 @@ TEST_CASE("project.store.import.missing_attachment") Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_throw", {}); REQUIRE(ImportOplog); CapturingJobContext Ctx; - CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -6313,7 +6393,8 @@ TEST_CASE("project.store.import.missing_attachment") Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_ignore", {}); REQUIRE(ImportOplog); CapturingJobContext Ctx; - CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -6358,7 +6439,8 @@ TEST_CASE("project.store.import.error.load_container_failure") REQUIRE(ImportOplog); CapturingJobContext Ctx; - CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -6785,6 +6867,7 @@ TEST_CASE("buildcontainer.public_overload_smoke") std::atomic<int> BlockCallCount{0}; CbObject Container = BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -6828,6 +6911,7 @@ TEST_CASE("buildcontainer.build_blocks_false_on_block_chunks") std::atomic<int> BlockChunksCallCount{0}; CbObject Container = BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -6893,6 +6977,7 @@ TEST_CASE("buildcontainer.ignore_missing_binary_attachment_warn") { CapturingJobContext Ctx; BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -6916,6 +7001,7 @@ TEST_CASE("buildcontainer.ignore_missing_binary_attachment_warn") SUBCASE("throw") { CHECK_THROWS(BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -6967,6 +7053,7 @@ TEST_CASE("buildcontainer.ignore_missing_file_attachment_warn") { CapturingJobContext Ctx; BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -6990,6 +7077,7 @@ TEST_CASE("buildcontainer.ignore_missing_file_attachment_warn") SUBCASE("throw") { CHECK_THROWS(BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -7008,6 +7096,61 @@ TEST_CASE("buildcontainer.ignore_missing_file_attachment_warn") } } +TEST_CASE("buildcontainer.zero_byte_file_attachment") +{ + // A zero-byte file on disk is a valid attachment. BuildContainer must process + // it without hitting ZEN_ASSERT(UploadAttachment->Size != 0) in + // ResolveAttachments. The empty file flows through the compress-inline path + // and becomes a LooseUploadAttachment with raw size 0. + using namespace projectstore_testutils; + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + std::filesystem::path RootDir = TempDir.Path() / "root"; + auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list<size_t>{512}); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_zero_byte_file", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); + + // Truncate the file to zero bytes after the oplog entry is created. + // The file still exists on disk so RewriteOplog's IsFile() check passes, + // but MakeFromFile returns a zero-size buffer. + std::filesystem::resize_file(FileAtts[0].second, 0); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + CbObject Container = BuildContainer( + Log(), + CidStore, + *Project, + *Oplog, + WorkerPool, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/true); + + CHECK(Container.GetSize() > 0); + + // The zero-byte attachment is packed into a block via the compress-inline path. + CbArrayView Blocks = Container["blocks"sv].AsArrayView(); + CHECK(Blocks.Num() > 0); +} + TEST_CASE("buildcontainer.embed_loose_files_false_no_rewrite") { // EmbedLooseFiles=false: RewriteOp is skipped for file-op entries; they pass through @@ -7030,6 +7173,7 @@ TEST_CASE("buildcontainer.embed_loose_files_false_no_rewrite") WorkerThreadPool WorkerPool(GetWorkerCount()); CbObject Container = BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -7080,6 +7224,7 @@ TEST_CASE("buildcontainer.allow_chunking_false") { std::atomic<int> LargeAttachmentCallCount{0}; BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -7103,6 +7248,7 @@ TEST_CASE("buildcontainer.allow_chunking_false") // Chunking branch in FindChunkSizes is taken, but the ~4 KB chunk still exceeds MaxChunkEmbedSize -> OnLargeAttachment. std::atomic<int> LargeAttachmentCallCount{0}; BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -7144,6 +7290,7 @@ TEST_CASE("buildcontainer.async_on_block_exception_propagates") WorkerThreadPool WorkerPool(GetWorkerCount()); CHECK_THROWS_AS(BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -7184,6 +7331,7 @@ TEST_CASE("buildcontainer.on_large_attachment_exception_propagates") WorkerThreadPool WorkerPool(GetWorkerCount()); CHECK_THROWS_AS(BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -7226,6 +7374,7 @@ TEST_CASE("buildcontainer.context_cancellation_aborts") Ctx.m_Cancel = true; CHECK_NOTHROW(BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -7265,6 +7414,7 @@ TEST_CASE("buildcontainer.context_progress_reporting") CapturingJobContext Ctx; BuildContainer( + Log(), CidStore, *Project, *Oplog, @@ -7428,7 +7578,8 @@ TEST_CASE("loadoplog.missing_block_attachment_ignored") CapturingJobContext Ctx; Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_block_import", {}); - CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, @@ -7501,7 +7652,8 @@ TEST_CASE("loadoplog.clean_oplog_with_populated_cache") { Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog("oplog_clean_cache_p1", {}); - LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = ImportCidStore, .RemoteStore = *RemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, @@ -7517,7 +7669,8 @@ TEST_CASE("loadoplog.clean_oplog_with_populated_cache") { Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog("oplog_clean_cache_p2", {}); - CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = ImportCidStore, .RemoteStore = *RemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, @@ -7532,6 +7685,158 @@ TEST_CASE("loadoplog.clean_oplog_with_populated_cache") } } +TEST_CASE("project.store.export.block_reuse_fresh_receiver") +{ + // Regression test: after a second export that reuses existing blocks, a fresh import must still + // receive all chunks. The bug: FindReuseBlocks erases reused-block chunks from UploadAttachments, + // but never adds the reused blocks to the container's "blocks" section. A fresh receiver then + // silently misses those chunks because ParseOplogContainer never sees them. + using namespace projectstore_testutils; + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + // -- Export side ---------------------------------------------------------- + GcManager ExportGc; + CidStore ExportCidStore(ExportGc); + CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + ExportCidStore.Initialize(ExportCidConfig); + + std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore"; + ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + Ref<ProjectStore::Project> ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + + // 20 KB with None encoding: compressed ~ 20 KB < MaxChunkEmbedSize (32 KB) -> packed into blocks. + Ref<ProjectStore::Oplog> Oplog = ExportProject->NewOplog("oplog_reuse_rt", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( + Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{20u * 1024u, 20u * 1024u}, OodleCompressionLevel::None))); + + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; + + constexpr size_t MaxBlockSize = 64u * 1024u; + constexpr size_t MaxChunksPerBlock = 1000; + constexpr size_t MaxChunkEmbedSize = 32u * 1024u; + constexpr size_t ChunkFileSizeLimit = 64u * 1024u * 1024u; + + // First export: creates blocks on disk. + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, + .MaxChunksPerBlock = MaxChunksPerBlock, + .MaxChunkEmbedSize = MaxChunkEmbedSize, + .ChunkFileSizeLimit = ChunkFileSizeLimit}, + /*.FolderPath =*/ExportDir.Path(), + /*.Name =*/std::string("oplog_reuse_rt"), + /*.OptionalBaseName =*/std::string(), + /*.ForceDisableBlocks =*/false, + /*.ForceEnableTempBlocks =*/false}; + + std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options); + SaveOplog(Log(), + ExportCidStore, + *RemoteStore, + *ExportProject, + *Oplog, + NetworkPool, + WorkerPool, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + /*EmbedLooseFiles*/ true, + /*ForceUpload*/ false, + /*IgnoreMissingAttachments*/ false, + /*OptionalContext*/ nullptr); + + // Verify first export produced blocks. + RemoteProjectStore::GetKnownBlocksResult KnownAfterFirst = RemoteStore->GetKnownBlocks(); + REQUIRE(!KnownAfterFirst.Blocks.empty()); + + // Second export to the SAME store: triggers block reuse via GetKnownBlocks. + SaveOplog(Log(), + ExportCidStore, + *RemoteStore, + *ExportProject, + *Oplog, + NetworkPool, + WorkerPool, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + /*EmbedLooseFiles*/ true, + /*ForceUpload*/ false, + /*IgnoreMissingAttachments*/ false, + /*OptionalContext*/ nullptr); + + // Verify the container has no duplicate block entries. + { + RemoteProjectStore::LoadContainerResult ContainerResult = RemoteStore->LoadContainer(); + REQUIRE(ContainerResult.ErrorCode == 0); + std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject); + REQUIRE(!BlockHashes.empty()); + std::unordered_set<IoHash, IoHash::Hasher> UniqueBlockHashes(BlockHashes.begin(), BlockHashes.end()); + CHECK(UniqueBlockHashes.size() == BlockHashes.size()); + } + + // Collect all attachment hashes referenced by the oplog ops. + std::unordered_set<IoHash, IoHash::Hasher> ExpectedHashes; + Oplog->IterateOplogWithKey([&](int, const Oid&, CbObjectView Op) { + Op.IterateAttachments([&](CbFieldView FieldView) { ExpectedHashes.insert(FieldView.AsAttachment()); }); + }); + REQUIRE(!ExpectedHashes.empty()); + + // -- Import side (fresh, empty CAS) -------------------------------------- + GcManager ImportGc; + CidStore ImportCidStore(ImportGc); + CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + ImportCidStore.Initialize(ImportCidConfig); + + std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore"; + ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + + Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog("oplog_reuse_rt_import", {}); + REQUIRE(ImportOplog); + + LoadOplog(LoadOplogContext{.Log = Log(), + .ChunkStore = ImportCidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All}); + + // Every attachment hash from the original oplog must be present in the import CAS. + for (const IoHash& Hash : ExpectedHashes) + { + CHECK_MESSAGE(ImportCidStore.ContainsChunk(Hash), "Missing chunk after import: ", Hash); + } +} + TEST_SUITE_END(); #endif // ZEN_WITH_TESTS |