aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp441
1 files changed, 373 insertions, 68 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 1a9dc10ef..f43f0813a 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -8,6 +8,8 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/logging/broadcastsink.h>
+#include <zencore/logging/logger.h>
#include <zencore/parallelwork.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
@@ -18,8 +20,9 @@
#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenremotestore/chunking/chunkedfile.h>
-#include <zenremotestore/operationlogoutput.h>
#include <zenstore/cidstore.h>
+#include <zenutil/logging.h>
+#include <zenutil/progress.h>
#include <numeric>
#include <unordered_map>
@@ -392,7 +395,10 @@ namespace remotestore_impl {
OodleCompressor Compressor,
OodleCompressionLevel CompressionLevel)
{
- ZEN_ASSERT(!IsFile(AttachmentPath));
+ if (IsFile(AttachmentPath))
+ {
+ ZEN_WARN("Temp attachment file already exists at '{}', truncating", AttachmentPath);
+ }
BasicFile CompressedFile;
std::error_code Ec;
CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec);
@@ -448,6 +454,7 @@ namespace remotestore_impl {
};
CbObject RewriteOplog(
+ LoggerRef InLog,
ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
bool IgnoreMissingAttachments,
@@ -456,6 +463,7 @@ namespace remotestore_impl {
std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher>& UploadAttachments, // TODO: Rename to OutUploadAttachments
JobContext* OptionalContext)
{
+ ZEN_SCOPED_LOG(InLog);
size_t OpCount = 0;
CreateDirectories(AttachmentTempPath);
@@ -929,7 +937,6 @@ namespace remotestore_impl {
{
return;
}
- ZEN_ASSERT(UploadAttachment->Size != 0);
if (!UploadAttachment->RawPath.empty())
{
if (UploadAttachment->Size > (MaxChunkEmbedSize * 2))
@@ -1140,31 +1147,51 @@ namespace remotestore_impl {
std::atomic<uint64_t> ChunksCompleteCount = 0;
};
- class JobContextLogOutput : public OperationLogOutput
+ class JobContextSink : public logging::Sink
{
public:
- JobContextLogOutput(JobContext* OptionalContext) : m_OptionalContext(OptionalContext) {}
- virtual void EmitLogMessage(const logging::LogPoint& Point, fmt::format_args Args) override
+ explicit JobContextSink(JobContext* Context) : m_Context(Context) {}
+
+ void Log(const logging::LogMessage& Msg) override
{
- if (m_OptionalContext)
+ if (m_Context)
{
- fmt::basic_memory_buffer<char, 250> MessageBuffer;
- fmt::vformat_to(fmt::appender(MessageBuffer), Point.FormatString, Args);
- remotestore_impl::ReportMessage(m_OptionalContext, std::string_view(MessageBuffer.data(), MessageBuffer.size()));
+ m_Context->ReportMessage(Msg.GetPayload());
}
}
- virtual void SetLogOperationName(std::string_view Name) override { ZEN_UNUSED(Name); }
- virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) override { ZEN_UNUSED(StepIndex, StepCount); }
- virtual uint32_t GetProgressUpdateDelayMS() override { return 0; }
- virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) override
+ void Flush() override {}
+ void SetFormatter(std::unique_ptr<logging::Formatter>) override {}
+
+ private:
+ JobContext* m_Context;
+ };
+
+ class JobContextLogger
+ {
+ public:
+ explicit JobContextLogger(JobContext* OptionalContext)
{
- ZEN_UNUSED(InSubTask);
- return nullptr;
+ if (!OptionalContext)
+ {
+ return;
+ }
+ logging::SinkPtr ContextSink(new JobContextSink(OptionalContext));
+ Ref<logging::BroadcastSink> DefaultSink = GetDefaultBroadcastSink();
+ std::vector<logging::SinkPtr> Sinks;
+ if (DefaultSink)
+ {
+ Sinks.push_back(DefaultSink);
+ }
+ Sinks.push_back(std::move(ContextSink));
+ Ref<logging::BroadcastSink> Broadcast(new logging::BroadcastSink(std::move(Sinks)));
+ m_Log = Ref<logging::Logger>(new logging::Logger("jobcontext", Broadcast));
}
+ LoggerRef Log() const { return m_Log ? LoggerRef(*m_Log) : zen::Log(); }
+
private:
- JobContext* m_OptionalContext;
+ Ref<logging::Logger> m_Log;
};
void DownloadAndSaveBlockChunks(LoadOplogContext& Context,
@@ -1185,6 +1212,7 @@ namespace remotestore_impl {
&LoadAttachmentsTimer,
&DownloadStartMS](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DownloadBlockChunks");
+ ZEN_SCOPED_LOG(Context.Log);
if (AbortFlag)
{
@@ -1300,6 +1328,7 @@ namespace remotestore_impl {
&AllNeededPartialChunkHashesLookup,
ChunkDownloadedFlags](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DownloadBlock");
+ ZEN_SCOPED_LOG(Context.Log);
if (AbortFlag)
{
@@ -1366,6 +1395,7 @@ namespace remotestore_impl {
&AllNeededPartialChunkHashesLookup,
ChunkDownloadedFlags,
Bytes = std::move(BlobBuffer)](std::atomic<bool>& AbortFlag) {
+ ZEN_SCOPED_LOG(Context.Log);
if (AbortFlag)
{
return;
@@ -1715,6 +1745,7 @@ namespace remotestore_impl {
ChunkDownloadedFlags,
RetriesLeft](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DownloadBlockRanges");
+ ZEN_SCOPED_LOG(Context.Log);
try
{
uint64_t Unset = (std::uint64_t)-1;
@@ -1760,6 +1791,7 @@ namespace remotestore_impl {
OffsetAndLengths =
std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())](
std::atomic<bool>& AbortFlag) {
+ ZEN_SCOPED_LOG(Context.Log);
try
{
ZEN_ASSERT(BlockPayload.Size() > 0);
@@ -1972,6 +2004,7 @@ namespace remotestore_impl {
Context.NetworkWorkerPool,
[&Context, &AttachmentWork, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, &Info](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DownloadAttachment");
+ ZEN_SCOPED_LOG(Context.Log);
if (AbortFlag)
{
@@ -2061,7 +2094,8 @@ namespace remotestore_impl {
WorkerThreadPool::EMode::EnableBacklog);
};
- void AsyncCreateBlock(ParallelWork& Work,
+ void AsyncCreateBlock(LoggerRef InLog,
+ ParallelWork& Work,
WorkerThreadPool& WorkerPool,
std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
RwLock& SectionsLock,
@@ -2071,9 +2105,10 @@ namespace remotestore_impl {
JobContext* OptionalContext)
{
Work.ScheduleWork(WorkerPool,
- [&Blocks, &SectionsLock, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, OptionalContext](
+ [InLog, &Blocks, &SectionsLock, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, OptionalContext](
std::atomic<bool>& AbortFlag) mutable {
ZEN_TRACE_CPU("CreateBlock");
+ ZEN_SCOPED_LOG(InLog);
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -2452,7 +2487,8 @@ GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> Include
}
CbObject
-BuildContainer(CidStore& ChunkStore,
+BuildContainer(LoggerRef InLog,
+ CidStore& ChunkStore,
ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
@@ -2472,7 +2508,8 @@ BuildContainer(CidStore& ChunkStore,
{
using namespace std::literals;
- std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext));
+ ZEN_SCOPED_LOG(InLog);
+ remotestore_impl::JobContextLogger JobContextOutput(OptionalContext);
Stopwatch Timer;
@@ -2485,7 +2522,8 @@ BuildContainer(CidStore& ChunkStore,
size_t TotalOpCount = Oplog.GetOplogEntryCount();
Stopwatch RewriteOplogTimer;
- CbObject SectionOps = remotestore_impl::RewriteOplog(Project,
+ CbObject SectionOps = remotestore_impl::RewriteOplog(InLog,
+ Project,
Oplog,
IgnoreMissingAttachments,
EmbedLooseFiles,
@@ -2605,7 +2643,7 @@ BuildContainer(CidStore& ChunkStore,
std::vector<uint32_t> UnusedChunkIndexes;
ReuseBlocksStatistics ReuseBlocksStats;
- ReusedBlockIndexes = FindReuseBlocks(*LogOutput,
+ ReusedBlockIndexes = FindReuseBlocks(JobContextOutput.Log(),
/*BlockReuseMinPercentLimit*/ 80,
/*IsVerbose*/ false,
ReuseBlocksStats,
@@ -2749,7 +2787,8 @@ BuildContainer(CidStore& ChunkStore,
.MaxChunkEmbedSize = MaxChunkEmbedSize,
.IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }});
- auto OnNewBlock = [&Work,
+ auto OnNewBlock = [&Log,
+ &Work,
&WorkerPool,
BuildBlocks,
&BlockCreateProgressTimer,
@@ -2774,7 +2813,8 @@ BuildContainer(CidStore& ChunkStore,
size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
if (BuildBlocks)
{
- remotestore_impl::AsyncCreateBlock(Work,
+ remotestore_impl::AsyncCreateBlock(Log(),
+ Work,
WorkerPool,
std::move(ChunksInBlock),
BlocksLock,
@@ -3007,6 +3047,17 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
+ // Reused blocks were not composed (their chunks were erased from UploadAttachments) but must
+ // still appear in the container so that a fresh receiver knows to download them.
+ if (BuildBlocks)
+ {
+ for (size_t KnownBlockIndex : ReusedBlockIndexes)
+ {
+ const ChunkBlockDescription& Reused = KnownBlocks[KnownBlockIndex];
+ Blocks.push_back(Reused);
+ }
+ }
+
CbObjectWriter OplogContainerWriter;
RwLock::SharedLockScope _(BlocksLock);
OplogContainerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
@@ -3096,7 +3147,8 @@ BuildContainer(CidStore& ChunkStore,
}
CbObject
-BuildContainer(CidStore& ChunkStore,
+BuildContainer(LoggerRef InLog,
+ CidStore& ChunkStore,
ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
WorkerThreadPool& WorkerPool,
@@ -3112,7 +3164,8 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
{
- return BuildContainer(ChunkStore,
+ return BuildContainer(InLog,
+ ChunkStore,
Project,
Oplog,
MaxBlockSize,
@@ -3132,7 +3185,8 @@ BuildContainer(CidStore& ChunkStore,
}
void
-SaveOplog(CidStore& ChunkStore,
+SaveOplog(LoggerRef InLog,
+ CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
@@ -3149,6 +3203,7 @@ SaveOplog(CidStore& ChunkStore,
{
using namespace std::literals;
+ ZEN_SCOPED_LOG(InLog);
Stopwatch Timer;
remotestore_impl::UploadInfo Info;
@@ -3168,8 +3223,8 @@ SaveOplog(CidStore& ChunkStore,
std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks;
tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
- auto MakeTempBlock = [AttachmentTempPath, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
- ChunkBlockDescription&& Block) {
+ auto MakeTempBlock = [&Log, AttachmentTempPath, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
+ ChunkBlockDescription&& Block) {
std::filesystem::path BlockPath = AttachmentTempPath;
BlockPath.append(Block.BlockHash.ToHexString());
IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath);
@@ -3180,8 +3235,8 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockSize));
};
- auto UploadBlock = [&RemoteStore, &RemoteStoreInfo, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
- ChunkBlockDescription&& Block) {
+ auto UploadBlock = [&Log, &RemoteStore, &RemoteStoreInfo, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
+ ChunkBlockDescription&& Block) {
IoHash BlockHash = Block.BlockHash;
uint64_t CompressedSize = CompressedBlock.GetCompressedSize();
RemoteProjectStore::SaveAttachmentResult Result =
@@ -3201,13 +3256,13 @@ SaveOplog(CidStore& ChunkStore,
};
std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>> BlockChunks;
- auto OnBlockChunks = [&BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) {
+ auto OnBlockChunks = [&Log, &BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) {
BlockChunks.push_back({std::make_move_iterator(Chunks.begin()), std::make_move_iterator(Chunks.end())});
ZEN_DEBUG("Found {} block chunks", Chunks.size());
};
- auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash,
- TGetAttachmentBufferFunc&& GetBufferFunc) {
+ auto OnLargeAttachment = [&Log, &AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash,
+ TGetAttachmentBufferFunc&& GetBufferFunc) {
{
RwLock::ExclusiveLockScope _(AttachmentsLock);
LargeAttachments.insert(AttachmentHash);
@@ -3286,7 +3341,8 @@ SaveOplog(CidStore& ChunkStore,
}
}
- CbObject OplogContainerObject = BuildContainer(ChunkStore,
+ CbObject OplogContainerObject = BuildContainer(InLog,
+ ChunkStore,
Project,
Oplog,
MaxBlockSize,
@@ -3694,7 +3750,8 @@ LoadOplog(LoadOplogContext&& Context)
{
using namespace std::literals;
- std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(Context.OptionalJobContext));
+ ZEN_SCOPED_LOG(Context.Log);
+ remotestore_impl::JobContextLogger JobContextOutput(Context.OptionalJobContext);
remotestore_impl::DownloadInfo Info;
@@ -3985,7 +4042,7 @@ LoadOplog(LoadOplogContext&& Context)
ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size());
ChunkBlockAnalyser PartialAnalyser(
- *LogOutput,
+ JobContextOutput.Log(),
BlockDescriptions.Blocks,
ChunkBlockAnalyser::Options{.IsQuiet = false,
.IsVerbose = false,
@@ -4108,10 +4165,10 @@ LoadOplog(LoadOplogContext&& Context)
std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
DechunkWork.ScheduleWork(
Context.WorkerPool,
- [&Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic<bool>& AbortFlag) {
+ [&Log, &Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DechunkAttachment");
- auto _ = MakeGuard([&TempFileName] {
+ auto _ = MakeGuard([&Log, &TempFileName] {
std::error_code Ec;
if (IsFile(TempFileName, Ec))
{
@@ -4712,7 +4769,8 @@ TEST_CASE_TEMPLATE("project.store.export",
WorkerThreadPool& NetworkPool = Pools.NetworkPool;
WorkerThreadPool& WorkerPool = Pools.WorkerPool;
- SaveOplog(CidStore,
+ SaveOplog(Log(),
+ CidStore,
*RemoteStore,
*Project.Get(),
*Oplog,
@@ -4732,7 +4790,8 @@ TEST_CASE_TEMPLATE("project.store.export",
CapturingJobContext Ctx;
auto DoLoad = [&](bool Force, bool Clean) {
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.OptionalCache = nullptr,
.CacheBuildId = Oid::Zero,
@@ -4793,7 +4852,8 @@ SetupExportStore(CidStore& CidStore,
/*.ForceEnableTempBlocks =*/false};
std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options);
- SaveOplog(CidStore,
+ SaveOplog(Log(),
+ CidStore,
*RemoteStore,
Project,
*Oplog,
@@ -4856,7 +4916,8 @@ SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, WorkerThreadPool& Wo
/*.ForceDisableBlocks =*/false,
/*.ForceEnableTempBlocks =*/false};
std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options);
- SaveOplog(LocalCidStore,
+ SaveOplog(Log(),
+ LocalCidStore,
*RemoteStore,
*LocalProject,
*Oplog,
@@ -5024,7 +5085,8 @@ TEST_CASE("project.store.import.context_settings")
bool PopulateCache,
bool ForceDownload) -> void {
Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog(fmt::format("import_{}", OpJobIndex++), {});
- LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = ImportCidStore,
.RemoteStore = *RemoteStore,
.OptionalCache = OptCache,
.CacheBuildId = CacheBuildId,
@@ -5131,7 +5193,8 @@ TEST_CASE("project.store.import.context_settings")
// StoreMaxRangeCountPerRequest=128 -> all three ranges sent in one LoadAttachmentRanges call.
Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {});
- LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = ImportCidStore,
.RemoteStore = *PartialRemoteStore,
.OptionalCache = nullptr,
.CacheBuildId = CacheBuildId,
@@ -5163,7 +5226,8 @@ TEST_CASE("project.store.import.context_settings")
SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash);
Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {});
- LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = ImportCidStore,
.RemoteStore = *PartialRemoteStore,
.OptionalCache = nullptr,
.CacheBuildId = CacheBuildId,
@@ -5194,7 +5258,8 @@ TEST_CASE("project.store.import.context_settings")
// Phase 1: full block download from remote populates the cache.
{
Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {});
- LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = ImportCidStore,
.RemoteStore = *PartialRemoteStore,
.OptionalCache = Cache.get(),
.CacheBuildId = CacheBuildId,
@@ -5226,7 +5291,8 @@ TEST_CASE("project.store.import.context_settings")
SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash);
Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {});
- LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = Phase2CidStore,
.RemoteStore = *PartialRemoteStore,
.OptionalCache = Cache.get(),
.CacheBuildId = CacheBuildId,
@@ -5259,7 +5325,8 @@ TEST_CASE("project.store.import.context_settings")
// Phase 1: full block download from remote into cache.
{
Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {});
- LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = ImportCidStore,
.RemoteStore = *PartialRemoteStore,
.OptionalCache = Cache.get(),
.CacheBuildId = CacheBuildId,
@@ -5291,7 +5358,8 @@ TEST_CASE("project.store.import.context_settings")
SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash);
Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {});
- LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = Phase2CidStore,
.RemoteStore = *PartialRemoteStore,
.OptionalCache = Cache.get(),
.CacheBuildId = CacheBuildId,
@@ -5373,7 +5441,8 @@ RunSaveOplog(CidStore& CidStore,
{
*OutRemoteStore = RemoteStore;
}
- SaveOplog(CidStore,
+ SaveOplog(Log(),
+ CidStore,
*RemoteStore,
Project,
Oplog,
@@ -5476,7 +5545,8 @@ TEST_CASE("project.store.embed_loose_files_true")
/*ForceDisableBlocks=*/false,
&RemoteStore);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_true_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -5530,7 +5600,8 @@ TEST_CASE("project.store.embed_loose_files_false" * doctest::skip()) // superse
&RemoteStore);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_false_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -5693,7 +5764,8 @@ TEST_CASE("project.store.export.large_file_attachment_direct")
&RemoteStore);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_direct_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -5750,7 +5822,8 @@ TEST_CASE("project.store.export.large_file_attachment_via_temp")
&RemoteStore);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_via_temp_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -5804,7 +5877,8 @@ TEST_CASE("project.store.export.large_chunk_from_cidstore")
&RemoteStore);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_cid_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -5867,7 +5941,8 @@ TEST_CASE("project.store.export.block_reuse")
BlockHashesAfterFirst.push_back(B.BlockHash);
}
- SaveOplog(CidStore,
+ SaveOplog(Log(),
+ CidStore,
*RemoteStore,
*Project,
*Oplog,
@@ -5944,7 +6019,8 @@ TEST_CASE("project.store.export.max_chunks_per_block")
CHECK(KnownBlocks.Blocks.size() >= 2);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_max_chunks_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -6027,7 +6103,8 @@ TEST_CASE("project.store.export.max_data_per_block")
CHECK(KnownBlocks.Blocks.size() >= 2);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_max_data_per_block_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -6155,7 +6232,8 @@ TEST_CASE("project.store.embed_loose_files_zero_data_hash")
&RemoteStore);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_zero_data_hash_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -6209,7 +6287,8 @@ TEST_CASE("project.store.embed_loose_files_already_resolved")
&RemoteStore1);
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_already_resolved_import", {});
- LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore1,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -6296,7 +6375,8 @@ TEST_CASE("project.store.import.missing_attachment")
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_throw", {});
REQUIRE(ImportOplog);
CapturingJobContext Ctx;
- CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -6313,7 +6393,8 @@ TEST_CASE("project.store.import.missing_attachment")
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_ignore", {});
REQUIRE(ImportOplog);
CapturingJobContext Ctx;
- CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -6358,7 +6439,8 @@ TEST_CASE("project.store.import.error.load_container_failure")
REQUIRE(ImportOplog);
CapturingJobContext Ctx;
- CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -6785,6 +6867,7 @@ TEST_CASE("buildcontainer.public_overload_smoke")
std::atomic<int> BlockCallCount{0};
CbObject Container = BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -6828,6 +6911,7 @@ TEST_CASE("buildcontainer.build_blocks_false_on_block_chunks")
std::atomic<int> BlockChunksCallCount{0};
CbObject Container = BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -6893,6 +6977,7 @@ TEST_CASE("buildcontainer.ignore_missing_binary_attachment_warn")
{
CapturingJobContext Ctx;
BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -6916,6 +7001,7 @@ TEST_CASE("buildcontainer.ignore_missing_binary_attachment_warn")
SUBCASE("throw")
{
CHECK_THROWS(BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -6967,6 +7053,7 @@ TEST_CASE("buildcontainer.ignore_missing_file_attachment_warn")
{
CapturingJobContext Ctx;
BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -6990,6 +7077,7 @@ TEST_CASE("buildcontainer.ignore_missing_file_attachment_warn")
SUBCASE("throw")
{
CHECK_THROWS(BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -7008,6 +7096,61 @@ TEST_CASE("buildcontainer.ignore_missing_file_attachment_warn")
}
}
+TEST_CASE("buildcontainer.zero_byte_file_attachment")
+{
+ // A zero-byte file on disk is a valid attachment. BuildContainer must process
+ // it without hitting ZEN_ASSERT(UploadAttachment->Size != 0) in
+ // ResolveAttachments. The empty file flows through the compress-inline path
+ // and becomes a LooseUploadAttachment with raw size 0.
+ using namespace projectstore_testutils;
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ std::unique_ptr<ProjectStore> ProjectStoreDummy;
+ Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy);
+
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list<size_t>{512});
+
+ Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("bc_zero_byte_file", {});
+ REQUIRE(Oplog);
+ Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts));
+
+ // Truncate the file to zero bytes after the oplog entry is created.
+ // The file still exists on disk so RewriteOplog's IsFile() check passes,
+ // but MakeFromFile returns a zero-size buffer.
+ std::filesystem::resize_file(FileAtts[0].second, 0);
+
+ WorkerThreadPool WorkerPool(GetWorkerCount());
+
+ CbObject Container = BuildContainer(
+ Log(),
+ CidStore,
+ *Project,
+ *Oplog,
+ WorkerPool,
+ 64u * 1024u,
+ 1000,
+ 32u * 1024u,
+ 64u * 1024u * 1024u,
+ /*BuildBlocks=*/true,
+ /*IgnoreMissingAttachments=*/false,
+ /*AllowChunking=*/true,
+ [](CompressedBuffer&&, ChunkBlockDescription&&) {},
+ [](const IoHash&, TGetAttachmentBufferFunc&&) {},
+ [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
+ /*EmbedLooseFiles=*/true);
+
+ CHECK(Container.GetSize() > 0);
+
+ // The zero-byte attachment is packed into a block via the compress-inline path.
+ CbArrayView Blocks = Container["blocks"sv].AsArrayView();
+ CHECK(Blocks.Num() > 0);
+}
+
TEST_CASE("buildcontainer.embed_loose_files_false_no_rewrite")
{
// EmbedLooseFiles=false: RewriteOp is skipped for file-op entries; they pass through
@@ -7030,6 +7173,7 @@ TEST_CASE("buildcontainer.embed_loose_files_false_no_rewrite")
WorkerThreadPool WorkerPool(GetWorkerCount());
CbObject Container = BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -7080,6 +7224,7 @@ TEST_CASE("buildcontainer.allow_chunking_false")
{
std::atomic<int> LargeAttachmentCallCount{0};
BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -7103,6 +7248,7 @@ TEST_CASE("buildcontainer.allow_chunking_false")
// Chunking branch in FindChunkSizes is taken, but the ~4 KB chunk still exceeds MaxChunkEmbedSize -> OnLargeAttachment.
std::atomic<int> LargeAttachmentCallCount{0};
BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -7144,6 +7290,7 @@ TEST_CASE("buildcontainer.async_on_block_exception_propagates")
WorkerThreadPool WorkerPool(GetWorkerCount());
CHECK_THROWS_AS(BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -7184,6 +7331,7 @@ TEST_CASE("buildcontainer.on_large_attachment_exception_propagates")
WorkerThreadPool WorkerPool(GetWorkerCount());
CHECK_THROWS_AS(BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -7226,6 +7374,7 @@ TEST_CASE("buildcontainer.context_cancellation_aborts")
Ctx.m_Cancel = true;
CHECK_NOTHROW(BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -7265,6 +7414,7 @@ TEST_CASE("buildcontainer.context_progress_reporting")
CapturingJobContext Ctx;
BuildContainer(
+ Log(),
CidStore,
*Project,
*Oplog,
@@ -7428,7 +7578,8 @@ TEST_CASE("loadoplog.missing_block_attachment_ignored")
CapturingJobContext Ctx;
Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_block_import", {});
- CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = CidStore,
.RemoteStore = *RemoteStore,
.Oplog = *ImportOplog,
.NetworkWorkerPool = NetworkPool,
@@ -7501,7 +7652,8 @@ TEST_CASE("loadoplog.clean_oplog_with_populated_cache")
{
Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog("oplog_clean_cache_p1", {});
- LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = ImportCidStore,
.RemoteStore = *RemoteStore,
.OptionalCache = Cache.get(),
.CacheBuildId = CacheBuildId,
@@ -7517,7 +7669,8 @@ TEST_CASE("loadoplog.clean_oplog_with_populated_cache")
{
Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog("oplog_clean_cache_p2", {});
- CHECK_NOTHROW(LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = ImportCidStore,
.RemoteStore = *RemoteStore,
.OptionalCache = Cache.get(),
.CacheBuildId = CacheBuildId,
@@ -7532,6 +7685,158 @@ TEST_CASE("loadoplog.clean_oplog_with_populated_cache")
}
}
+TEST_CASE("project.store.export.block_reuse_fresh_receiver")
+{
+ // Regression test: after a second export that reuses existing blocks, a fresh import must still
+ // receive all chunks. The bug: FindReuseBlocks erases reused-block chunks from UploadAttachments,
+ // but never adds the reused blocks to the container's "blocks" section. A fresh receiver then
+ // silently misses those chunks because ParseOplogContainer never sees them.
+ using namespace projectstore_testutils;
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+ ScopedTemporaryDirectory ExportDir;
+
+ // -- Export side ----------------------------------------------------------
+ GcManager ExportGc;
+ CidStore ExportCidStore(ExportGc);
+ CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ ExportCidStore.Initialize(ExportCidConfig);
+
+ std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore";
+ ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+ Ref<ProjectStore::Project> ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+
+ // 20 KB with None encoding: compressed ~ 20 KB < MaxChunkEmbedSize (32 KB) -> packed into blocks.
+ Ref<ProjectStore::Oplog> Oplog = ExportProject->NewOplog("oplog_reuse_rt", {});
+ REQUIRE(Oplog);
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
+ Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{20u * 1024u, 20u * 1024u}, OodleCompressionLevel::None)));
+
+ TestWorkerPools Pools;
+ WorkerThreadPool& NetworkPool = Pools.NetworkPool;
+ WorkerThreadPool& WorkerPool = Pools.WorkerPool;
+
+ constexpr size_t MaxBlockSize = 64u * 1024u;
+ constexpr size_t MaxChunksPerBlock = 1000;
+ constexpr size_t MaxChunkEmbedSize = 32u * 1024u;
+ constexpr size_t ChunkFileSizeLimit = 64u * 1024u * 1024u;
+
+ // First export: creates blocks on disk.
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize,
+ .MaxChunksPerBlock = MaxChunksPerBlock,
+ .MaxChunkEmbedSize = MaxChunkEmbedSize,
+ .ChunkFileSizeLimit = ChunkFileSizeLimit},
+ /*.FolderPath =*/ExportDir.Path(),
+ /*.Name =*/std::string("oplog_reuse_rt"),
+ /*.OptionalBaseName =*/std::string(),
+ /*.ForceDisableBlocks =*/false,
+ /*.ForceEnableTempBlocks =*/false};
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options);
+ SaveOplog(Log(),
+ ExportCidStore,
+ *RemoteStore,
+ *ExportProject,
+ *Oplog,
+ NetworkPool,
+ WorkerPool,
+ MaxBlockSize,
+ MaxChunksPerBlock,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ /*EmbedLooseFiles*/ true,
+ /*ForceUpload*/ false,
+ /*IgnoreMissingAttachments*/ false,
+ /*OptionalContext*/ nullptr);
+
+ // Verify first export produced blocks.
+ RemoteProjectStore::GetKnownBlocksResult KnownAfterFirst = RemoteStore->GetKnownBlocks();
+ REQUIRE(!KnownAfterFirst.Blocks.empty());
+
+ // Second export to the SAME store: triggers block reuse via GetKnownBlocks.
+ SaveOplog(Log(),
+ ExportCidStore,
+ *RemoteStore,
+ *ExportProject,
+ *Oplog,
+ NetworkPool,
+ WorkerPool,
+ MaxBlockSize,
+ MaxChunksPerBlock,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ /*EmbedLooseFiles*/ true,
+ /*ForceUpload*/ false,
+ /*IgnoreMissingAttachments*/ false,
+ /*OptionalContext*/ nullptr);
+
+ // Verify the container has no duplicate block entries.
+ {
+ RemoteProjectStore::LoadContainerResult ContainerResult = RemoteStore->LoadContainer();
+ REQUIRE(ContainerResult.ErrorCode == 0);
+ std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject);
+ REQUIRE(!BlockHashes.empty());
+ std::unordered_set<IoHash, IoHash::Hasher> UniqueBlockHashes(BlockHashes.begin(), BlockHashes.end());
+ CHECK(UniqueBlockHashes.size() == BlockHashes.size());
+ }
+
+ // Collect all attachment hashes referenced by the oplog ops.
+ std::unordered_set<IoHash, IoHash::Hasher> ExpectedHashes;
+ Oplog->IterateOplogWithKey([&](int, const Oid&, CbObjectView Op) {
+ Op.IterateAttachments([&](CbFieldView FieldView) { ExpectedHashes.insert(FieldView.AsAttachment()); });
+ });
+ REQUIRE(!ExpectedHashes.empty());
+
+ // -- Import side (fresh, empty CAS) --------------------------------------
+ GcManager ImportGc;
+ CidStore ImportCidStore(ImportGc);
+ CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ ImportCidStore.Initialize(ImportCidConfig);
+
+ std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore";
+ ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{});
+ Ref<ProjectStore::Project> ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+
+ Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog("oplog_reuse_rt_import", {});
+ REQUIRE(ImportOplog);
+
+ LoadOplog(LoadOplogContext{.Log = Log(),
+ .ChunkStore = ImportCidStore,
+ .RemoteStore = *RemoteStore,
+ .Oplog = *ImportOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = true,
+ .IgnoreMissingAttachments = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All});
+
+ // Every attachment hash from the original oplog must be present in the import CAS.
+ for (const IoHash& Hash : ExpectedHashes)
+ {
+ CHECK_MESSAGE(ImportCidStore.ContainsChunk(Hash), "Missing chunk after import: ", Hash);
+ }
+}
+
TEST_SUITE_END();
#endif // ZEN_WITH_TESTS