diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 1167 |
1 files changed, 911 insertions, 256 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 13b2e7b1e..f4b93538e 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -4547,40 +4547,19 @@ namespace projectstore_testutils { return Result; } - class TestJobContext : public JobContext - { - public: - explicit TestJobContext(int& OpIndex) : m_OpIndex(OpIndex) {} - virtual bool IsCancelled() const { return false; } - virtual void ReportMessage(std::string_view Message) { ZEN_INFO("Job {}: {}", m_OpIndex, Message); } - virtual void ReportProgress(std::string_view CurrentOp, - std::string_view Details, - ptrdiff_t TotalCount, - ptrdiff_t RemainingCount, - uint64_t ElapsedTimeMs) - { - ZEN_UNUSED(ElapsedTimeMs); - ZEN_INFO("Job {}: Op '{}'{} {}/{}", - m_OpIndex, - CurrentOp, - Details.empty() ? "" : fmt::format(" {}", Details), - TotalCount - RemainingCount, - TotalCount); - } - - private: - int& m_OpIndex; - }; - struct CapturingJobContext : public JobContext { - bool IsCancelled() const override { return false; } + bool IsCancelled() const override { return m_Cancel; } void ReportMessage(std::string_view Message) override { RwLock::ExclusiveLockScope _(m_Lock); Messages.emplace_back(Message); } - void ReportProgress(std::string_view, std::string_view, ptrdiff_t, ptrdiff_t, uint64_t) override {} + void ReportProgress(std::string_view Op, std::string_view Details, ptrdiff_t, ptrdiff_t, uint64_t) override + { + RwLock::ExclusiveLockScope _(m_Lock); + ProgressMessages.emplace_back(fmt::format("{}: {}", Op, Details)); + } bool HasMessage(std::string_view Substr) const { @@ -4590,12 +4569,38 @@ namespace projectstore_testutils { }); } + bool m_Cancel = false; std::vector<std::string> Messages; + std::vector<std::string> ProgressMessages; private: mutable RwLock m_Lock; }; + // Worker pool pair for tests that exercise both network and compute workers. + // Member initialisation order matches declaration order: counts first, then pools. + struct TestWorkerPools + { + private: + uint32_t m_NetworkCount; + uint32_t m_WorkerCount; + + public: + WorkerThreadPool NetworkPool; + WorkerThreadPool WorkerPool; + + TestWorkerPools() + : m_NetworkCount(Max(GetHardwareConcurrency() / 4u, 2u)) + , m_WorkerCount(m_NetworkCount < GetHardwareConcurrency() ? Max(GetHardwareConcurrency() - m_NetworkCount, 4u) : 4u) + , NetworkPool(m_NetworkCount) + , WorkerPool(m_WorkerCount) + { + } + }; + + // Worker count for tests that only need a single WorkerThreadPool. + inline uint32_t GetWorkerCount() { return Max(GetHardwareConcurrency() / 4u, 2u); } + // Create a test IoHash with a unique value based on a small index. inline IoHash MakeTestHash(uint8_t Index) { @@ -4708,11 +4713,9 @@ TEST_CASE_TEMPLATE("project.store.export", std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; SaveOplog(CidStore, *RemoteStore, @@ -4730,70 +4733,28 @@ TEST_CASE_TEMPLATE("project.store.export", nullptr); Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {}); - CHECK(OplogImport); + REQUIRE(OplogImport); - int OpJobIndex = 0; - TestJobContext OpJobContext(OpJobIndex); - - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = Oid::Zero, - .Oplog = *OplogImport, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &OpJobContext}); - - OpJobIndex++; - - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = Oid::Zero, - .Oplog = *OplogImport, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = true, - .IgnoreMissingAttachments = false, - .CleanOplog = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &OpJobContext}); - - OpJobIndex++; - - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = Oid::Zero, - .Oplog = *OplogImport, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = false, - .IgnoreMissingAttachments = false, - .CleanOplog = true, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &OpJobContext}); - - OpJobIndex++; - - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .OptionalCache = nullptr, - .CacheBuildId = Oid::Zero, - .Oplog = *OplogImport, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = true, - .IgnoreMissingAttachments = false, - .CleanOplog = true, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &OpJobContext}); + CapturingJobContext Ctx; + auto DoLoad = [&](bool Force, bool Clean) { + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = Force, + .IgnoreMissingAttachments = false, + .CleanOplog = Clean, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx}); + }; - OpJobIndex++; + DoLoad(false, false); + DoLoad(true, false); + DoLoad(false, true); + DoLoad(true, true); } // Common oplog setup used by the two tests below. @@ -5033,10 +4994,9 @@ TEST_CASE("project.store.import.context_settings") ProjectRootDir.string(), ProjectFilePath.string())); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr<RemoteProjectStore> RemoteStore = SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path()); @@ -5078,7 +5038,7 @@ TEST_CASE("project.store.import.context_settings") int OpJobIndex = 0; - TestJobContext OpJobContext(OpJobIndex); + CapturingJobContext OpJobContext; // Helper: run a LoadOplog against the import-side CAS/project with the given context knobs. // Each call creates a fresh oplog so repeated calls within one SUBCASE don't short-circuit on @@ -5506,10 +5466,9 @@ TEST_CASE("project.store.export.no_attachments_needed") Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; CapturingJobContext Ctx; RunSaveOplog(CidStore, @@ -5555,10 +5514,9 @@ TEST_CASE("project.store.embed_loose_files_true") Oplog->AppendNewOplogEntry( CreateFilesOplogPackage(Oid::NewOid(), RootDir, CreateFileAttachments(RootDir, std::initializer_list<size_t>{1024, 2048}))); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr<RemoteProjectStore> RemoteStore; RunSaveOplog(CidStore, @@ -5587,7 +5545,7 @@ TEST_CASE("project.store.embed_loose_files_true") .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } -TEST_CASE("project.store.embed_loose_files_false") +TEST_CASE("project.store.embed_loose_files_false" * doctest::skip()) // superseded by buildcontainer.embed_loose_files_false_no_rewrite { // EmbedLooseFiles=false skips RewriteOp and writes file-op entries directly to the // section ops writer (line 1929: SectionOpsWriter << Op). Round-trip must succeed. @@ -5610,10 +5568,9 @@ TEST_CASE("project.store.embed_loose_files_false") Oplog->AppendNewOplogEntry( CreateFilesOplogPackage(Oid::NewOid(), RootDir, CreateFileAttachments(RootDir, std::initializer_list<size_t>{1024, 2048}))); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr<RemoteProjectStore> RemoteStore; RunSaveOplog(CidStore, @@ -5643,7 +5600,8 @@ TEST_CASE("project.store.embed_loose_files_false") .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } -TEST_CASE("project.store.export.missing_attachment_ignored") +TEST_CASE("project.store.export.missing_attachment_ignored" * + doctest::skip()) // superseded by buildcontainer.ignore_missing_file_attachment_warn { // Files are created on disk, added to the oplog, then deleted before SaveOplog runs. // RewriteOp detects the missing file and emits a "Missing attachment" message. @@ -5672,10 +5630,9 @@ TEST_CASE("project.store.export.missing_attachment_ignored") std::filesystem::remove(Path); } - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; CapturingJobContext Ctx; RunSaveOplog(CidStore, @@ -5697,7 +5654,8 @@ TEST_CASE("project.store.export.missing_attachment_ignored") CHECK(Ctx.HasMessage("Missing attachment")); } -TEST_CASE("project.store.export.missing_chunk_in_cidstore") +TEST_CASE("project.store.export.missing_chunk_in_cidstore" * + doctest::skip()) // superseded by buildcontainer.ignore_missing_binary_attachment_warn/throw { // A bulk-data op references a hash that is not present in CidStore. // With IgnoreMissingAttachments=false the export must fail with NotFound. @@ -5737,10 +5695,9 @@ TEST_CASE("project.store.export.missing_chunk_in_cidstore") REQUIRE(Oplog); Oplog->AppendNewOplogEntry(Package); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; CHECK_THROWS(RunSaveOplog(CidStore, *Project, @@ -5783,10 +5740,9 @@ TEST_CASE("project.store.export.large_file_attachment_direct") REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; @@ -5845,10 +5801,9 @@ TEST_CASE("project.store.export.large_file_attachment_via_temp") REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; @@ -5903,10 +5858,9 @@ TEST_CASE("project.store.export.large_chunk_from_cidstore") REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), Attachments)); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; @@ -5964,10 +5918,9 @@ TEST_CASE("project.store.export.block_reuse") Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{20u * 1024u, 20u * 1024u}, OodleCompressionLevel::None))); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; constexpr size_t MaxBlockSize = 64u * 1024u; @@ -6056,10 +6009,9 @@ TEST_CASE("project.store.export.max_chunks_per_block") Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{2u * 1024u, 2u * 1024u, 2u * 1024u}, OodleCompressionLevel::None))); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunksPerBlock = 2; constexpr size_t MaxBlockSize = 1u * 1024u * 1024u; @@ -6146,10 +6098,9 @@ TEST_CASE("project.store.export.max_data_per_block") Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oids[4], CreateAttachments(std::initializer_list<size_t>{1676, 1678}, OodleCompressionLevel::None))); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunksPerBlock = 32; constexpr size_t MaxBlockSize = 7u * 1024u; @@ -6237,10 +6188,9 @@ TEST_CASE("project.store.export.file_deleted_between_phases") } }; - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; DeleteOnRewriteContext Ctx; Ctx.Paths = &FilePaths; @@ -6296,10 +6246,9 @@ TEST_CASE("project.store.embed_loose_files_zero_data_hash") REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackageWithZeroDataHash(Oid::NewOid(), RootDir, FileAtts)); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr<RemoteProjectStore> RemoteStore; RunSaveOplog(CidStore, @@ -6356,10 +6305,9 @@ TEST_CASE("project.store.embed_loose_files_already_resolved") REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; // First export: RewriteOp hashes the files and rewrites each entry to include // "data": BinaryAttachment(H). @@ -6412,11 +6360,10 @@ TEST_CASE("project.store.embed_loose_files_already_resolved") /*ForceDisableBlocks=*/false); } -TEST_CASE("project.store.import.error.missing_attachment") +TEST_CASE("project.store.import.missing_attachment") { - // Export a small oplog (blocks disabled to avoid pre-existing ZEN_ASSERT), delete one - // attachment file from the remote store, then verify that LoadOplog returns a non-zero - // error code when IgnoreMissingAttachments=false. + // Export a small oplog with ForceDisableBlocks=true (only loose .blob files), delete one + // attachment, then test both sides of IgnoreMissingAttachments. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; @@ -6433,10 +6380,9 @@ TEST_CASE("project.store.import.error.missing_attachment") Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{512, 1024}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{2048, 3000}))); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr<RemoteProjectStore> RemoteStore; RunSaveOplog(CidStore, @@ -6471,94 +6417,41 @@ TEST_CASE("project.store.import.error.missing_attachment") std::filesystem::remove(DeletedBlob, Ec); REQUIRE(!Ec); - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_import", {}); - REQUIRE(ImportOplog); - - CapturingJobContext Ctx; - CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = true, - .IgnoreMissingAttachments = false, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &Ctx}), - RemoteStoreError); -} - -TEST_CASE("project.store.import.ignore_missing_attachment") -{ - // Same setup as project.store.import.error.missing_attachment, but with - // IgnoreMissingAttachments=true. LoadOplog should succeed (ErrorCode == 0) - // despite the missing attachment file. - using namespace projectstore_testutils; - - ScopedTemporaryDirectory TempDir; - ScopedTemporaryDirectory ExportDir; - - GcManager Gc; - CidStore CidStore(Gc); - std::unique_ptr<ProjectStore> ProjectStoreDummy; - Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); - - Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog_ignore_missing", {}); - REQUIRE(Oplog); - - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{512, 1024}))); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{2048, 3000}))); - - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); - - std::shared_ptr<RemoteProjectStore> RemoteStore; - RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_ignore_missing", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/false, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/true, - &RemoteStore); - - // Find and delete one .blob attachment file. - std::filesystem::path DeletedBlob; - for (const auto& Entry : std::filesystem::recursive_directory_iterator(ExportDir.Path())) - { - if (Entry.path().extension() == ".blob") - { - DeletedBlob = Entry.path(); - break; - } + SUBCASE("throws_when_not_ignored") + { + // IgnoreMissingAttachments=false: LoadOplog must throw RemoteStoreError. + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_throw", {}); + REQUIRE(ImportOplog); + CapturingJobContext Ctx; + CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx}), + RemoteStoreError); } - REQUIRE(!DeletedBlob.empty()); - std::error_code Ec; - std::filesystem::remove(DeletedBlob, Ec); - REQUIRE(!Ec); - - Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_ignore_missing_import", {}); - REQUIRE(ImportOplog); - CapturingJobContext Ctx; - LoadOplog(LoadOplogContext{.ChunkStore = CidStore, - .RemoteStore = *RemoteStore, - .Oplog = *ImportOplog, - .NetworkWorkerPool = NetworkPool, - .WorkerPool = WorkerPool, - .ForceDownload = true, - .IgnoreMissingAttachments = true, - .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, - .OptionalJobContext = &Ctx}); + SUBCASE("succeeds_when_ignored") + { + // IgnoreMissingAttachments=true: LoadOplog must succeed and report the failure. + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_ignore", {}); + REQUIRE(ImportOplog); + CapturingJobContext Ctx; + CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx})); + CHECK(Ctx.HasMessage("Failed to load attachments")); + } } TEST_CASE("project.store.import.error.load_container_failure") @@ -6588,10 +6481,9 @@ TEST_CASE("project.store.import.error.load_container_failure") /*.ForceEnableTempBlocks =*/false}; std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options); - uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); - uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; - WorkerThreadPool WorkerPool(WorkerCount); - WorkerThreadPool NetworkPool(NetworkWorkerCount); + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("load_container_failure_import", {}); REQUIRE(ImportOplog); @@ -7047,6 +6939,769 @@ TEST_CASE("project.store.blockcomposer.path_a_pending_untouched") CHECK(Blocks[1][1] == MakeTestHash(2)); } +// --------------------------------------------------------------------------- +// BuildContainer-direct tests (Steps 2a–2l) +// --------------------------------------------------------------------------- + +TEST_CASE("buildcontainer.public_overload_smoke") +{ + // Smoke test for the public BuildContainer overload. + // One bulkdata op with a 1 KB attachment (< MaxChunkEmbedSize → goes into a block). + // Verifies the public overload compiles and runs, that the result is non-empty, and + // that AsyncOnBlock fires at least once. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_smoke", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024}))); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + std::atomic<int> BlockCallCount{0}; + CbObject Container = BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + [&](CompressedBuffer&&, ChunkBlockDescription&&) { BlockCallCount.fetch_add(1); }, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false); + + CHECK(Container.GetSize() > 0); + CHECK(BlockCallCount.load() >= 1); +} + +TEST_CASE("buildcontainer.build_blocks_false_on_block_chunks") +{ + // With BuildBlocks=false, small attachments that would normally be grouped into a block + // are instead delivered to OnBlockChunks (line ~2792). + // AsyncOnBlock must NOT fire. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_no_blocks", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 1024, 1024}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 1024, 1024}))); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + std::atomic<int> BlockChunksCallCount{0}; + CbObject Container = BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/false, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + [](CompressedBuffer&&, ChunkBlockDescription&&) { CHECK(false); }, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [&](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) { BlockChunksCallCount.fetch_add(1); }, + /*EmbedLooseFiles=*/false); + + CHECK(Container.GetSize() > 0); + CHECK(BlockChunksCallCount.load() >= 1); +} + +TEST_CASE("buildcontainer.ignore_missing_binary_attachment_warn") +{ + // A bulk-data op references a hash that is absent from CidStore. + // SUBCASE warn: IgnoreMissingAttachments=true → ReportMessage("Missing attachment …"). + // SUBCASE throw: IgnoreMissingAttachments=false → std::runtime_error. + using namespace projectstore_testutils; + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + // Fabricate a hash not in CidStore and build a package that references it as a + // BinaryAttachment field but carries no inline attachment data. + IoBuffer FakeData = CreateRandomBlob(256); + IoHash FakeHash = IoHash::HashBuffer(FakeData); + + CbObjectWriter Object; + Object << "key"sv << OidAsString(Oid::NewOid()); + Object.BeginArray("bulkdata"sv); + { + Object.BeginObject(); + Object << "id"sv << Oid::NewOid(); + Object << "type"sv + << "Standard"sv; + Object.AddBinaryAttachment("data"sv, FakeHash); + Object.EndObject(); + } + Object.EndArray(); + CbPackage Package; + Package.SetObject(Object.Save()); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_missing_bin", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry(Package); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + SUBCASE("warn") + { + CapturingJobContext Ctx; + BuildContainer( + CidStore, + *Project, + *Oplog, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/true, + /*AllowChunking=*/true, + {}, + WorkerPool, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false, + &Ctx); + CHECK(Ctx.HasMessage("Missing attachment")); + } + + SUBCASE("throw") + { + CHECK_THROWS(BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false)); + } +} + +TEST_CASE("buildcontainer.ignore_missing_file_attachment_warn") +{ + // File attachments are created on disk then deleted before BuildContainer runs. + // SUBCASE warn: IgnoreMissingAttachments=true → ReportMessage("Missing attachment …"). + // SUBCASE throw: IgnoreMissingAttachments=false → exception. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + std::filesystem::path RootDir = TempDir.Path() / "root"; + auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list<size_t>{512, 1024}); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_missing_file", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); + + // Delete files before BuildContainer runs so RewriteOp finds them missing. + for (const auto& [Id, Path] : FileAtts) + { + std::filesystem::remove(Path); + } + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + SUBCASE("warn") + { + CapturingJobContext Ctx; + BuildContainer( + CidStore, + *Project, + *Oplog, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/true, + /*AllowChunking=*/true, + {}, + WorkerPool, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/true, + &Ctx); + CHECK(Ctx.HasMessage("Missing attachment")); + } + + SUBCASE("throw") + { + CHECK_THROWS(BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/true)); + } +} + +TEST_CASE("buildcontainer.embed_loose_files_false_no_rewrite") +{ + // EmbedLooseFiles=false: RewriteOp is skipped for file-op entries; they pass through + // unchanged. Neither AsyncOnBlock nor OnLargeAttachment should fire. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + std::filesystem::path RootDir = TempDir.Path() / "root"; + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_embed_false", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry( + CreateFilesOplogPackage(Oid::NewOid(), RootDir, CreateFileAttachments(RootDir, std::initializer_list<size_t>{1024, 2048}))); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + CbObject Container = BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + [](CompressedBuffer&&, ChunkBlockDescription&&) { CHECK(false); }, + [](const IoHash&, TGetAttachmentBufferFunc&&) { CHECK(false); }, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false); + + CHECK(Container.GetSize() > 0); +} + +TEST_CASE("buildcontainer.allow_chunking_false") +{ + // When AllowChunking=false, a bulkdata attachment that exceeds ChunkFileSizeLimit + // is NOT split into chunks and goes directly to OnLargeAttachment. + // SUBCASE false: verify OnLargeAttachment fires. + // SUBCASE true: verify OnLargeAttachment does NOT fire for the same data (gets chunked + // into smaller pieces → AsyncOnBlock or OnBlockChunks fires instead). + // + // Parameters chosen so that the 4 KB attachment: + // - exceeds MaxChunkEmbedSize (2 KB) → classified as large + // - exceeds ChunkFileSizeLimit (1 KB) → eligible for chunking when AllowChunking=true + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + // Use OodleCompressionLevel::None so the compressed size stays ≈ raw size (4 KB), + // ensuring it exceeds both MaxChunkEmbedSize (2 KB) and ChunkFileSizeLimit (1 KB). + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_allow_chunk", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{4096}, OodleCompressionLevel::None))); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + constexpr size_t TestMaxBlockSize = 16u * 1024u; + constexpr size_t TestMaxChunkEmbedSize = 2u * 1024u; + constexpr size_t TestChunkFileSizeLimit = 1u * 1024u; + + SUBCASE("allow_chunking_false") + { + std::atomic<int> LargeAttachmentCallCount{0}; + BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + TestMaxBlockSize, + 1000, + TestMaxChunkEmbedSize, + TestChunkFileSizeLimit, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/false, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [&](const IoHash&, TGetAttachmentBufferFunc&&) { LargeAttachmentCallCount.fetch_add(1); }, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false); + CHECK(LargeAttachmentCallCount.load() >= 1); + } + + SUBCASE("allow_chunking_true") + { + // With AllowChunking=true the chunking branch in FindChunkSizes IS taken. + // The resulting chunk (still ~4 KB) exceeds MaxChunkEmbedSize (2 KB) so it + // still ends up in OnLargeAttachment — the value of this subcase is exercising + // the `AllowChunking && Size > ChunkFileSizeLimit` branch in FindChunkSizes. + std::atomic<int> LargeAttachmentCallCount{0}; + BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + TestMaxBlockSize, + 1000, + TestMaxChunkEmbedSize, + TestChunkFileSizeLimit, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [&](const IoHash&, TGetAttachmentBufferFunc&&) { LargeAttachmentCallCount.fetch_add(1); }, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false); + CHECK(LargeAttachmentCallCount.load() >= 1); + } +} + +TEST_CASE("buildcontainer.async_on_block_exception_propagates") +{ + // If AsyncOnBlock throws, the exception must propagate out of BuildContainer. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_block_exc", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 1024, 1024}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 1024, 1024}))); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + CHECK_THROWS_AS(BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + [](CompressedBuffer&&, ChunkBlockDescription&&) { throw std::runtime_error("inject_block"); }, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false), + std::runtime_error); +} + +TEST_CASE("buildcontainer.on_large_attachment_exception_propagates") +{ + // If OnLargeAttachment throws, the exception must propagate out of BuildContainer. + // A 64 KB attachment with MaxChunkEmbedSize=32 KB exceeds the embed limit so it + // goes to OnLargeAttachment. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_large_exc", {}); + REQUIRE(Oplog); + // 64 KB with OodleCompressionLevel::None → compressed ≈ 64 KB > MaxChunkEmbedSize (32 KB). + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{64u * 1024u}, OodleCompressionLevel::None))); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + CHECK_THROWS_AS(BuildContainer( + CidStore, + *Project, + *Oplog, + WorkerPool, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/false, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) { throw std::runtime_error("inject_large"); }, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false), + std::runtime_error); +} + +TEST_CASE("buildcontainer.context_cancellation_aborts") +{ + // With IsCancelled() returning true from the start, BuildContainer must not crash + // or throw unhandled — it returns an empty result or exits cleanly. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_cancel", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 1024}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 1024}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 1024}))); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + CapturingJobContext Ctx; + Ctx.m_Cancel = true; + + // Must not throw or crash. + CHECK_NOTHROW(BuildContainer( + CidStore, + *Project, + *Oplog, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + {}, + WorkerPool, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false, + &Ctx)); +} + +TEST_CASE("buildcontainer.context_progress_reporting") +{ + // ReportProgress is called at least once by BuildContainer (the "Scanning oplog" call). + // Verifies the CapturingJobContext.ProgressMessages field is populated. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_progress", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024}))); + + WorkerThreadPool WorkerPool(GetWorkerCount()); + + CapturingJobContext Ctx; + BuildContainer( + CidStore, + *Project, + *Oplog, + 64u * 1024u, + 1000, + 32u * 1024u, + 64u * 1024u * 1024u, + /*BuildBlocks=*/true, + /*IgnoreMissingAttachments=*/false, + /*AllowChunking=*/true, + {}, + WorkerPool, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /*EmbedLooseFiles=*/false, + &Ctx); + + CHECK(!Ctx.ProgressMessages.empty()); +} + +// --------------------------------------------------------------------------- +// SaveOplog-focused tests (Step 3) +// --------------------------------------------------------------------------- + +TEST_CASE("saveoplog.cancellation") +{ + // When IsCancelled() returns true from the start, SaveOplog must return without + // throwing. The first IsCancelled check inside BuildContainer (or UploadAttachments) + // aborts before significant work is done. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog_cancel_save", {}); + REQUIRE(Oplog); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 2048}))); + + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; + + CapturingJobContext Ctx; + Ctx.m_Cancel = true; + + CHECK_NOTHROW(RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_cancel_save", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/false, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + &Ctx, + /*ForceDisableBlocks=*/false)); +} + +// --------------------------------------------------------------------------- +// LoadOplog-focused tests (Step 4) +// --------------------------------------------------------------------------- + +TEST_CASE("loadoplog.missing_block_attachment_ignored") +{ + // Export with ForceDisableBlocks=false so that a block (.blob) is created in the + // remote store. Then delete the block file. With IgnoreMissingAttachments=true, + // LoadOplog must not throw and must report the failure. + using namespace projectstore_testutils; + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog_missing_block", {}); + REQUIRE(Oplog); + // Four small attachments — all < MaxChunkEmbedSize (32 KB) → go into a block. + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{1024, 1024, 2048, 512}))); + + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; + + std::shared_ptr<RemoteProjectStore> RemoteStore; + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_missing_block", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/false, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); + + // Find block hashes and delete their .blob files from ExportDir. + RemoteProjectStore::GetKnownBlocksResult KnownBlocks = RemoteStore->GetKnownBlocks(); + REQUIRE(KnownBlocks.ErrorCode == 0); + REQUIRE(!KnownBlocks.Blocks.empty()); + + for (const ChunkBlockDescription& BlockDesc : KnownBlocks.Blocks) + { + std::string HexStr = BlockDesc.BlockHash.ToHexString(); + std::filesystem::path BlockPath = ExportDir.Path() / HexStr.substr(0, 3) / HexStr.substr(3, 2) / (HexStr.substr(5) + ".blob"); + REQUIRE(std::filesystem::exists(BlockPath)); + std::filesystem::remove(BlockPath); + } + + CapturingJobContext Ctx; + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_block_import", {}); + CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx})); + CHECK(Ctx.HasMessage("Failed to download block attachment")); +} + +TEST_CASE("loadoplog.clean_oplog_with_populated_cache") +{ + // First import populates the BuildStorageCache. + // Second import with CleanOplog=true and the same non-null cache exercises the + // OptionalCache->Flush() call at line ~4304 (currently unreached because all existing + // CleanOplog=true callers pass OptionalCache=nullptr). + using namespace projectstore_testutils; + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + // Export side: used only to build the remote store payload. + GcManager ExportGc; + CidStore ExportCidStore(ExportGc); + CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + ExportCidStore.Initialize(ExportCidConfig); + + std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore"; + ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + + TestWorkerPools Pools; + WorkerThreadPool& NetworkPool = Pools.NetworkPool; + WorkerThreadPool& WorkerPool = Pools.WorkerPool; + + std::shared_ptr<RemoteProjectStore> RemoteStore = + SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path()); + + // Import side: starts empty so the first import genuinely downloads everything. + GcManager ImportGc; + CidStore ImportCidStore(ImportGc); + CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + ImportCidStore.Initialize(ImportCidConfig); + + std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore"; + ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + + const Oid CacheBuildId = Oid::NewOid(); + BuildStorageCache::Statistics CacheStats; + std::unique_ptr<BuildStorageCache> Cache = CreateInMemoryBuildStorageCache(256u, CacheStats); + + // First import: populate the cache. + { + Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog("oplog_clean_cache_p1", {}); + LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase1Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .PopulateCache = true}); + } + + // Second import: CleanOplog=true with a non-null cache exercises the Flush() path. + { + Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog("oplog_clean_cache_p2", {}); + CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase2Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .CleanOplog = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .PopulateCache = false})); + } +} + TEST_SUITE_END(); #endif // ZEN_WITH_TESTS |