aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZousar Shaker <[email protected]>2025-03-26 17:01:30 -0600
committerGitHub Enterprise <[email protected]>2025-03-26 17:01:30 -0600
commit56af48235a5394b2906e9b347d43404394a4e756 (patch)
tree05530b3da98773794320e5a2ab507ec879bf6e9d /src
parentDescriptive type conversion messages (diff)
parentzen build cache service (#318) (diff)
downloadzen-56af48235a5394b2906e9b347d43404394a4e756.tar.xz
zen-56af48235a5394b2906e9b347d43404394a4e756.zip
Merge branch 'main' into zs/ui-show-cook-artifacts
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1883
-rw-r--r--src/zen/cmds/builds_cmd.h3
-rw-r--r--src/zenhttp/httpclient.cpp26
-rw-r--r--src/zenhttp/include/zenhttp/formatters.h46
-rw-r--r--src/zenhttp/packageformat.cpp45
-rw-r--r--src/zenserver/admin/admin.cpp16
-rw-r--r--src/zenserver/admin/admin.h3
-rw-r--r--src/zenserver/buildstore/httpbuildstore.cpp526
-rw-r--r--src/zenserver/buildstore/httpbuildstore.h65
-rw-r--r--src/zenserver/config.cpp10
-rw-r--r--src/zenserver/config.h12
-rw-r--r--src/zenserver/workspaces/httpworkspaces.cpp2
-rw-r--r--src/zenserver/zenserver.cpp19
-rw-r--r--src/zenserver/zenserver.h4
-rw-r--r--src/zenstore-test/zenstore-test.cpp2
-rw-r--r--src/zenstore/blockstore.cpp2
-rw-r--r--src/zenstore/buildstore/buildstore.cpp1475
-rw-r--r--src/zenstore/compactcas.cpp4
-rw-r--r--src/zenstore/compactcas.h4
-rw-r--r--src/zenstore/gc.cpp21
-rw-r--r--src/zenstore/include/zenstore/accesstime.h47
-rw-r--r--src/zenstore/include/zenstore/blockstore.h2
-rw-r--r--src/zenstore/include/zenstore/buildstore/buildstore.h186
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h1
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h38
-rw-r--r--src/zenstore/include/zenstore/gc.h4
-rw-r--r--src/zenutil/buildstoragecache.cpp362
-rw-r--r--src/zenutil/chunkblock.cpp2
-rw-r--r--src/zenutil/filebuildstorage.cpp27
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h6
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h52
-rw-r--r--src/zenutil/include/zenutil/logging/rotatingfilesink.h1
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp35
33 files changed, 3984 insertions, 947 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 889ccef0b..b2ad579f1 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -20,6 +20,7 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/httpclientauth.h>
#include <zenhttp/httpcommon.h>
+#include <zenutil/buildstoragecache.h>
#include <zenutil/chunkblock.h>
#include <zenutil/chunkedcontent.h>
#include <zenutil/chunkedfile.h>
@@ -51,6 +52,8 @@ ZEN_THIRD_PARTY_INCLUDES_END
#define EXTRA_VERIFY 0
+#define ZEN_CLOUD_STORAGE "Cloud Storage"
+
namespace zen {
namespace {
static std::atomic<bool> AbortFlag = false;
@@ -87,6 +90,8 @@ namespace {
const double DefaultLatency = 0; // .0010;
const double DefaultDelayPerKBSec = 0; // 0.00005;
+ const bool SingleThreaded = false;
+
const std::string ZenFolderName = ".zen";
const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
@@ -204,22 +209,27 @@ namespace {
{
try
{
- std::filesystem::remove(LocalFilePath);
- }
- catch (const std::exception&)
- {
- // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time
- Sleep(200);
- try
- {
- std::filesystem::remove(LocalFilePath);
- }
- catch (const std::exception& Ex)
+ std::error_code Ec;
+ std::filesystem::remove(LocalFilePath, Ec);
+ if (Ec)
{
- ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what());
- CleanWipe = false;
+ // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time
+ Ec.clear();
+ if (std::filesystem::exists(LocalFilePath, Ec) || Ec)
+ {
+ Sleep(200);
+ if (std::filesystem::exists(LocalFilePath))
+ {
+ std::filesystem::remove(LocalFilePath);
+ }
+ }
}
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what());
+ CleanWipe = false;
+ }
}
for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories)
@@ -425,7 +435,7 @@ namespace {
Path,
std::move(IsAcceptedFolder),
std::move(IsAcceptedFile),
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
UsePlainProgress ? 5000 : 200,
[](bool, std::ptrdiff_t) {},
AbortFlag);
@@ -439,7 +449,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent FolderContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
Path,
Content,
ChunkController,
@@ -603,11 +613,9 @@ namespace {
struct WriteChunkStatistics
{
- std::atomic<uint32_t> ChunkCountWritten = 0;
- std::atomic<uint64_t> ChunkBytesWritten = 0;
- uint64_t DownloadTimeUs = 0;
- uint64_t WriteTimeUs = 0;
- uint64_t WriteChunksElapsedWallTimeUs = 0;
+ uint64_t DownloadTimeUs = 0;
+ uint64_t WriteTimeUs = 0;
+ uint64_t WriteChunksElapsedWallTimeUs = 0;
};
struct RebuildFolderStateStatistics
@@ -626,6 +634,15 @@ namespace {
uint64_t VerifyElapsedWallTimeUs = 0;
};
+ struct StorageInstance
+ {
+ std::unique_ptr<HttpClient> BuildStorageHttp;
+ std::unique_ptr<BuildStorage> BuildStorage;
+ std::string StorageName;
+ std::unique_ptr<HttpClient> CacheHttp;
+ std::unique_ptr<BuildStorageCache> BuildCacheStorage;
+ };
+
std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes,
const std::span<const uint32_t> LocalChunkOrder,
const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
@@ -772,7 +789,7 @@ namespace {
std::span<const uint32_t> ChunkCounts,
std::span<const IoHash> LocalChunkHashes,
std::span<const uint64_t> LocalChunkRawSizes,
- std::vector<uint32_t> AbsoluteChunkOrders,
+ const std::vector<uint32_t>& AbsoluteChunkOrders,
const std::span<const uint32_t> LooseLocalChunkIndexes,
const std::span<IoHash> BlockHashes)
{
@@ -1444,7 +1461,7 @@ namespace {
for (auto& WorkItem : WorkItems)
{
Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(),//
+ NetworkPool,
[WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
ZEN_TRACE_CPU("DownloadLargeBlob_Work");
if (!AbortFlag)
@@ -1513,15 +1530,16 @@ namespace {
}
ValidateStats.BlockAttachmentCount = BlockAttachments.size();
- std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments);
+ std::vector<ChunkBlockDescription> VerifyBlockDescriptions =
+ ParseChunkBlockDescriptionList(Storage.GetBlockMetadatas(BuildId, BlockAttachments));
if (VerifyBlockDescriptions.size() != BlockAttachments.size())
{
throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing",
BlockAttachments.size() - VerifyBlockDescriptions.size()));
}
- WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
ParallellWork Work(AbortFlag);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -1881,7 +1899,7 @@ namespace {
void GenerateBuildBlocks(const std::filesystem::path& Path,
const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
- BuildStorage& Storage,
+ StorageInstance& Storage,
const Oid& BuildId,
const std::vector<std::vector<uint32_t>>& NewBlockChunks,
GeneratedBlocks& OutBlocks,
@@ -1904,9 +1922,8 @@ namespace {
RwLock Lock;
- WorkerThreadPool& GenerateBlobsPool =
- GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
- WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
+ WorkerThreadPool& GenerateBlobsPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& UploadBlocksPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
@@ -2005,21 +2022,35 @@ namespace {
const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
- Storage.PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- std::move(Payload).GetCompressed());
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
+ }
+
+ Storage.BuildStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload).GetCompressed());
UploadStats.BlocksBytes += CompressedBlockSize;
+
ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(CompressedBlockSize),
OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
- Storage.PutBlockMetadata(BuildId,
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- BlockMetaData);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(BlockMetaData.GetSize()));
OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
@@ -2074,7 +2105,7 @@ namespace {
}
}
- void UploadPartBlobs(BuildStorage& Storage,
+ void UploadPartBlobs(StorageInstance& Storage,
const Oid& BuildId,
const std::filesystem::path& Path,
const ChunkedFolderContent& Content,
@@ -2092,8 +2123,8 @@ namespace {
{
ProgressBar ProgressBar(UsePlainProgress);
- WorkerThreadPool& ReadChunkPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& UploadChunkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& ReadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& UploadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
FilteredRate FilteredGenerateBlockBytesPerSecond;
FilteredRate FilteredCompressedBytesPerSecond;
@@ -2177,18 +2208,26 @@ namespace {
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ }
+ Storage.BuildStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockHash,
NiceBytes(PayloadSize),
NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
UploadedBlockSize += PayloadSize;
UploadStats.BlocksBytes += PayloadSize;
- Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
- ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(BlockMetaData.GetSize()));
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize()));
NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
@@ -2214,12 +2253,17 @@ namespace {
ZEN_TRACE_CPU("AsyncUploadLooseChunk");
const uint64_t PayloadSize = Payload.GetSize();
- ;
+
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ }
+
if (PayloadSize >= LargeAttachmentSize)
{
ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart");
UploadStats.MultipartAttachmentCount++;
- std::vector<std::function<void()>> MultipartWork = Storage.PutLargeBuildBlob(
+ std::vector<std::function<void()>> MultipartWork = Storage.BuildStorage->PutLargeBuildBlob(
BuildId,
RawHash,
ZenContentType::kCompressedBinary,
@@ -2232,7 +2276,7 @@ namespace {
PartPayload.SetContentType(ZenContentType::kBinary);
return PartPayload;
},
- [&, RawSize](uint64_t SentBytes, bool IsComplete) {
+ [&, Payload, RawSize](uint64_t SentBytes, bool IsComplete) {
UploadStats.ChunksBytes += SentBytes;
UploadedCompressedChunkSize += SentBytes;
if (IsComplete)
@@ -2264,7 +2308,7 @@ namespace {
else
{
ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart");
- Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ Storage.BuildStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
UploadStats.ChunksBytes += Payload.GetSize();
UploadStats.ChunkCount++;
@@ -2303,7 +2347,7 @@ namespace {
if (!AbortFlag)
{
Work.ScheduleWork(
- ReadChunkPool, // GetSyncWorkerPool()
+ SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool,
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -2362,7 +2406,7 @@ namespace {
{
const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex];
Work.ScheduleWork(
- ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool,
+ SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool,
[&, ChunkIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -2598,7 +2642,7 @@ namespace {
return FilteredReuseBlockIndexes;
};
- void UploadFolder(BuildStorage& Storage,
+ void UploadFolder(StorageInstance& Storage,
const Oid& BuildId,
const Oid& BuildPartId,
const std::string_view BuildPartName,
@@ -2654,7 +2698,7 @@ namespace {
ZEN_TRACE_CPU("CreateBuild");
Stopwatch PutBuildTimer;
- CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData);
+ CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData);
Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
Result.PayloadSize = MetaData.GetSize();
@@ -2663,7 +2707,7 @@ namespace {
{
ZEN_TRACE_CPU("PutBuild");
Stopwatch GetBuildTimer;
- CbObject Build = Storage.GetBuild(BuildId);
+ CbObject Build = Storage.BuildStorage->GetBuild(BuildId);
Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
Result.PayloadSize = Build.GetSize();
if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
@@ -2681,7 +2725,7 @@ namespace {
{
ZEN_TRACE_CPU("FindBlocks");
Stopwatch KnownBlocksTimer;
- Result.KnownBlocks = Storage.FindBlocks(BuildId);
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId));
FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
@@ -2796,10 +2840,10 @@ namespace {
}
return true;
},
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
UsePlainProgress ? 5000 : 200,
[&](bool, std::ptrdiff_t) {
- ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
+ ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
},
AbortFlag);
}
@@ -2855,7 +2899,7 @@ namespace {
FilteredBytesHashed.Start();
LocalContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
Path,
Content,
*ChunkController,
@@ -2985,7 +3029,7 @@ namespace {
(FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount)
: 0.0;
ZEN_CONSOLE(
- "Found {} chunks in {} ({}) blocks eligeble for reuse in {}\n"
+ "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
" Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
" Accepting {} ({}) redundant chunks ({:.1f}%)\n"
" Rejected {} ({}) chunks in {} blocks\n"
@@ -3204,7 +3248,8 @@ namespace {
}
Stopwatch PutBuildPartResultTimer;
- std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ Storage.BuildStorage->PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest);
ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.",
NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
NiceBytes(PartManifest.GetSize()),
@@ -3289,7 +3334,7 @@ namespace {
while (!AbortFlag)
{
Stopwatch FinalizeBuildPartTimer;
- std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash);
+ std::vector<IoHash> Needs = Storage.BuildStorage->FinalizeBuildPart(BuildId, BuildPartId, PartHash);
ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.",
NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
Needs.size());
@@ -3304,7 +3349,7 @@ namespace {
if (CreateBuild && !AbortFlag)
{
Stopwatch FinalizeBuildTimer;
- Storage.FinalizeBuild(BuildId);
+ Storage.BuildStorage->FinalizeBuild(BuildId);
ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
}
@@ -3321,7 +3366,13 @@ namespace {
{
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
UploadStats.BlocksBytes += BlockMetaData.GetSize();
NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
UploadBlockMetadataCount++;
@@ -3340,7 +3391,7 @@ namespace {
DownloadStatistics ValidateDownloadStats;
if (PostUploadVerify && !AbortFlag)
{
- ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats);
}
ZEN_CONSOLE_VERBOSE(
@@ -3570,7 +3621,7 @@ namespace {
ValidateInfo);
- Storage.PutBuildPartStats(
+ Storage.BuildStorage->PutBuildPartStats(
BuildId,
BuildPartId,
{{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())},
@@ -3597,7 +3648,7 @@ namespace {
ProgressBar ProgressBar(UsePlainProgress);
- WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
ParallellWork Work(AbortFlag);
@@ -3873,6 +3924,22 @@ namespace {
return ChunkTargetPtrs;
};
+ uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex)
+ {
+ uint64_t WriteCount = 0;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
+ {
+ if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
+ {
+ WriteCount++;
+ }
+ }
+ return WriteCount;
+ };
+
void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash)
{
ZEN_TRACE_CPU("FinalizeChunkSequence");
@@ -3892,8 +3959,39 @@ namespace {
}
}
+ void VerifySequence(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ uint32_t RemoteSequenceIndex)
+ {
+ ZEN_TRACE_CPU("VerifySequence");
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const std::uint32_t RemotePathIndex = Lookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ExpectedSize = RemoteContent.RawSizes[RemotePathIndex];
+ IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash));
+ const uint64_t VerifySize = VerifyBuffer.GetSize();
+ if (VerifySize != ExpectedSize)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}",
+ SequenceRawHash,
+ VerifySize,
+ ExpectedSize));
+ }
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ }
+ }
+
void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder,
const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
std::span<const uint32_t> RemoteSequenceIndexes,
ParallellWork& Work,
WorkerThreadPool& VerifyPool)
@@ -3908,40 +4006,24 @@ namespace {
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
Work.ScheduleWork(
VerifyPool,
- [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
+ [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
+ if (!AbortFlag)
{
- ZEN_TRACE_CPU("HashSequence");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
- FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
},
Work.DefaultErrorFunction());
}
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+ VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("HashSequence");
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
- }
- }
FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
@@ -3985,8 +4067,7 @@ namespace {
const BlockWriteOps& Ops,
ParallellWork& Work,
WorkerThreadPool& VerifyPool,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
{
@@ -4017,12 +4098,6 @@ namespace {
FileOffset,
RemoteContent.RawSizes[PathIndex]);
}
- WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
- WriteChunkStats.ChunkBytesWritten +=
- std::accumulate(Ops.ChunkBuffers.begin(),
- Ops.ChunkBuffers.end(),
- uint64_t(0),
- [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); });
}
if (!AbortFlag)
{
@@ -4036,7 +4111,7 @@ namespace {
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
- VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool);
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool);
}
}
@@ -4162,8 +4237,7 @@ namespace {
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockToDisk");
@@ -4197,8 +4271,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
return false;
@@ -4222,8 +4295,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
return false;
@@ -4240,8 +4312,7 @@ namespace {
uint32_t LastIncludedBlockChunkIndex,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WritePartialBlockToDisk");
@@ -4267,8 +4338,7 @@ namespace {
Ops,
Work,
VerifyPool,
- DiskStats,
- WriteChunkStats);
+ DiskStats);
return true;
}
else
@@ -4355,8 +4425,7 @@ namespace {
void StreamDecompress(const std::filesystem::path& CacheFolderPath,
const IoHash& SequenceRawHash,
CompositeBuffer&& CompressedPart,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
@@ -4390,7 +4459,6 @@ namespace {
DiskStats.ReadByteCount += SourceSize;
if (!AbortFlag)
{
- WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize();
DecompressedTemp.Write(RangeBuffer, Offset);
for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
{
@@ -4424,7 +4492,7 @@ namespace {
throw std::runtime_error(
fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
}
- WriteChunkStats.ChunkCountWritten++;
+ // WriteChunkStats.ChunkCountWritten++;
}
bool WriteCompressedChunk(const std::filesystem::path& TargetFolder,
@@ -4433,8 +4501,7 @@ namespace {
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
IoBuffer&& CompressedPart,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
@@ -4444,7 +4511,7 @@ namespace {
{
const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
- StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats);
+ StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats);
}
else
{
@@ -4459,8 +4526,6 @@ namespace {
ChunkTargetPtrs,
CompositeBuffer(std::move(Chunk)),
OpenFileCache);
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten += ChunkRawSize;
return true;
}
}
@@ -4479,8 +4544,7 @@ namespace {
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
FilteredRate& FilteredWrittenBytesPerSecond,
- DiskStatistics& DiskStats,
- WriteChunkStatistics& WriteChunkStats)
+ DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
@@ -4527,7 +4591,7 @@ namespace {
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&,
SequenceIndexChunksLeftToWriteCounters,
CompressedChunkPath,
@@ -4565,8 +4629,7 @@ namespace {
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
- DiskStats,
- WriteChunkStats);
+ DiskStats);
if (!AbortFlag)
{
WritePartsComplete++;
@@ -4581,7 +4644,12 @@ namespace {
CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
if (NeedHashVerify)
{
- VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool);
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ CompletedSequences,
+ Work,
+ WritePool);
}
else
{
@@ -4593,7 +4661,7 @@ namespace {
Work.DefaultErrorFunction());
};
- void UpdateFolder(BuildStorage& Storage,
+ void UpdateFolder(StorageInstance& Storage,
const Oid& BuildId,
const std::filesystem::path& Path,
const std::uint64_t LargeAttachmentSize,
@@ -4667,8 +4735,8 @@ namespace {
if (SequenceSize == CacheDirContent.FileSizes[Index])
{
CachedSequenceHashesFound.insert({FileHash, SequenceIndex});
- CacheMappingStats.CacheSequenceHashesCount += SequenceSize;
- CacheMappingStats.CacheSequenceHashesByteCount++;
+ CacheMappingStats.CacheSequenceHashesCount++;
+ CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize;
continue;
}
}
@@ -4869,21 +4937,17 @@ namespace {
NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount));
}
- uint32_t ChunkCountToWrite = 0;
+ uint64_t BytesToWrite = 0;
+
for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
{
- if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
{
- ChunkCountToWrite++;
- }
- else
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex);
- if (!ChunkTargetPtrs.empty())
+ BytesToWrite += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
- ChunkCountToWrite++;
}
}
}
@@ -4900,8 +4964,8 @@ namespace {
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredWrittenBytesPerSecond;
- WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
- WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst);
+ WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
ProgressBar WriteProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
@@ -4922,7 +4986,7 @@ namespace {
const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
- ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash);
continue;
}
bool NeedsCopy = true;
@@ -4933,7 +4997,7 @@ namespace {
if (ChunkTargetPtrs.empty())
{
- ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash);
+ ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash);
}
else
{
@@ -5025,6 +5089,10 @@ namespace {
uint32_t CurrentOffset =
gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(CurrentOffset));
+
BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
@@ -5064,6 +5132,7 @@ namespace {
}
ZEN_ASSERT(!BlockRanges.empty());
+
std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
auto It = BlockRanges.begin();
CollapsedBlockRanges.push_back(*It++);
@@ -5085,10 +5154,87 @@ namespace {
++It;
}
- TotalRequestCount += CollapsedBlockRanges.size();
- TotalPartWriteCount += CollapsedBlockRanges.size();
+ const std::uint64_t WantedSize = std::accumulate(
+ CollapsedBlockRanges.begin(),
+ CollapsedBlockRanges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+ ZEN_ASSERT(WantedSize <= TotalBlockSize);
+ if (WantedSize > ((TotalBlockSize * 95) / 100))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize));
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1)
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64))
+ {
+ ZEN_CONSOLE_VERBOSE("Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block",
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ FullBlockWorks.push_back(BlockIndex);
+ }
+ else
+ {
+ if (WantedSize > ((TotalBlockSize * 5) / 10))
+ {
+ ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block",
+ (WantedSize * 100) / TotalBlockSize,
+ NiceBytes(WantedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ CollapsedBlockRanges.size());
+ }
+ TotalRequestCount += CollapsedBlockRanges.size();
+ TotalPartWriteCount += CollapsedBlockRanges.size();
- BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
+ BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end());
+ }
}
else
{
@@ -5101,10 +5247,69 @@ namespace {
}
else
{
- ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
+ ZEN_CONSOLE_VERBOSE("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
}
}
+ struct BlobsExistsResult
+ {
+ tsl::robin_set<IoHash> ExistingBlobs;
+ uint64_t ElapsedTimeMs = 0;
+ };
+
+ BlobsExistsResult ExistsResult;
+
+ if (Storage.BuildCacheStorage)
+ {
+ ZEN_TRACE_CPU("BlobCacheExistCheck");
+ Stopwatch Timer;
+
+ tsl::robin_set<IoHash> BlobHashesSet;
+
+ BlobHashesSet.reserve(LooseChunkHashWorks.size() + FullBlockWorks.size());
+ for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ {
+ BlobHashesSet.insert(RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
+ }
+ for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks)
+ {
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ BlobHashesSet.insert(BlockDescription.BlockHash);
+ }
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ BlobHashesSet.insert(BlockDescription.BlockHash);
+ }
+
+ if (!BlobHashesSet.empty())
+ {
+ const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ Storage.BuildCacheStorage->BlobsExists(BuildId, BlobHashes);
+
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ {
+ if (CacheExistsResult[BlobIndex].HasBody)
+ {
+ ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
+ }
+ }
+ }
+ ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (!ExistsResult.ExistingBlobs.empty())
+ {
+ ZEN_CONSOLE("Found {} out of {} needed blobs in remote cache in {}",
+ ExistsResult.ExistingBlobs.size(),
+ BlobHashes.size(),
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ }
+ }
+ }
for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
{
if (AbortFlag)
@@ -5119,7 +5324,7 @@ namespace {
const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
Work.ScheduleWork(
- WritePool, // NetworkPool, // GetSyncWorkerPool(),//
+ WritePool,
[&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
@@ -5152,151 +5357,202 @@ namespace {
}
}
}
- if (!ExistingCompressedChunkPath.empty())
+ if (!AbortFlag)
+
{
- Work.ScheduleWork(
- WritePool, // WritePool, GetSyncWorkerPool()
- [&Path,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- &TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&Path,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
- FilteredWrittenBytesPerSecond.Start();
+ FilteredWrittenBytesPerSecond.Start();
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
+ }
- std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- std::move(CompressedPart),
- DiskStats,
- WriteChunkStats);
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten +=
- RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex];
- WritePartsComplete++;
+ std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ DiskStats);
+ WritePartsComplete++;
- if (!AbortFlag)
- {
- if (WritePartsComplete == TotalPartWriteCount)
+ if (!AbortFlag)
{
- FilteredWrittenBytesPerSecond.Stop();
- }
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
- std::filesystem::remove(CompressedChunkPath);
+ std::filesystem::remove(CompressedChunkPath);
- std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
- if (NeedHashVerify)
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ CompletedSequences,
+ Work,
+ WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ else
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&Path,
+ &Storage,
+ BuildId,
+ &RemoteContent,
+ &RemoteLookup,
+ &ExistsResult,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &NetworkPool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ LargeAttachmentSize,
+ PreferredMultipartChunkSize,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ &DownloadStats](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ FilteredDownloadedBytesPerSecond.Start();
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
- VerifyAndCompleteChunkSequencesAsync(TargetFolder,
- RemoteContent,
- CompletedSequences,
- Work,
- WritePool);
+ ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
+ DownloadLargeBlob(*Storage.BuildStorage,
+ Path / ZenTempDownloadFolderName,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (!AbortFlag)
+ {
+ AsyncWriteDownloadedChunk(
+ Path,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ });
}
else
{
- FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ ZEN_TRACE_CPU("UpdateFolder_GetChunk");
+ IoBuffer BuildBlob;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash))
+ {
+ BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
+ }
+ if (!BuildBlob)
+ {
+ BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
+ if (BuildBlob && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ BuildBlob.GetContentType(),
+ CompositeBuffer(SharedBuffer(BuildBlob)));
+ }
+ }
+ if (!BuildBlob)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ if (!AbortFlag)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(Path,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
}
}
- }
- },
- Work.DefaultErrorFunction());
- }
- else
- {
- FilteredDownloadedBytesPerSecond.Start();
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(Storage,
- Path / ZenTempDownloadFolderName,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(Path,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats,
- WriteChunkStats);
- });
- }
- else
- {
- ZEN_TRACE_CPU("UpdateFolder_GetChunk");
-
- IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash);
- if (!BuildBlob)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(Path,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats,
- WriteChunkStats);
+ },
+ Work.DefaultErrorFunction());
}
}
}
@@ -5312,7 +5568,7 @@ namespace {
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&, CopyDataIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5439,16 +5695,6 @@ namespace {
ChunkSource,
Op.Target->Offset,
RemoteContent.RawSizes[RemotePathIndex]);
- for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++)
- {
- const WriteOp& WrittenOp = WriteOps[WrittenOpIndex];
- if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second)
- {
- WriteChunkStats.ChunkCountWritten++;
- WriteChunkStats.ChunkBytesWritten +=
- RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex];
- }
- }
CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
@@ -5469,10 +5715,13 @@ namespace {
}
VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
RemoteContent,
+ RemoteLookup,
CompletedChunkSequences,
Work,
WritePool);
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ ZEN_CONSOLE_VERBOSE("Copied {} from {}",
+ NiceBytes(CacheLocalFileBytesRead),
+ LocalContent.Paths[LocalPathIndex]);
}
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
@@ -5492,7 +5741,7 @@ namespace {
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(), // WritePool,
+ WritePool,
[&, BlockIndex](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
@@ -5509,27 +5758,29 @@ namespace {
fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
}
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
- WritePartsComplete++;
- std::filesystem::remove(BlockChunkPath);
- if (WritePartsComplete == TotalPartWriteCount)
+ if (!AbortFlag)
{
- FilteredWrittenBytesPerSecond.Stop();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ std::filesystem::remove(BlockChunkPath);
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
}
},
@@ -5546,7 +5797,7 @@ namespace {
ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
const uint32_t BlockIndex = BlockRange.BlockIndex;
Work.ScheduleWork(
- NetworkPool, // NetworkPool, // GetSyncWorkerPool()
+ NetworkPool,
[&, BlockIndex, BlockRange](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5555,131 +5806,148 @@ namespace {
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer =
- Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ IoBuffer BlockBuffer;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ }
if (!BlockBuffer)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
}
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (!BlockBuffer)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
-
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ if (!AbortFlag)
{
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
{
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = Path / ZenTempBlockFolderName /
- fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
{
- BlockChunkPath = std::filesystem::path{};
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = Path / ZenTempBlockFolderName /
+ fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
}
}
}
- }
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath =
- Path / ZenTempBlockFolderName /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = Path / ZenTempBlockFolderName /
+ fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool, // WritePool, // GetSyncWorkerPool(),
- [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
- std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
+ std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockToDisk(
- CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
- if (!BlockChunkPath.empty())
- {
- std::filesystem::remove(BlockChunkPath);
- }
+ if (!BlockChunkPath.empty())
+ {
+ std::filesystem::remove(BlockChunkPath);
+ }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
}
},
@@ -5693,7 +5961,7 @@ namespace {
break;
}
Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(), // NetworkPool,
+ NetworkPool,
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -5702,133 +5970,152 @@ namespace {
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash);
+
+ IoBuffer BlockBuffer;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ }
if (!BlockBuffer)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (BlockBuffer && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
+ }
}
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (!BlockBuffer)
{
- FilteredDownloadedBytesPerSecond.Stop();
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
+ if (!AbortFlag)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- std::filesystem::path BlockChunkPath;
+ std::filesystem::path BlockChunkPath;
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
{
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
{
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
{
- BlockChunkPath = std::filesystem::path{};
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
}
}
}
- }
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString();
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool, // WritePool, GetSyncWorkerPool()
- [&Work,
- &WritePool,
- &RemoteContent,
- &RemoteLookup,
- CacheFolderPath,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- BlockIndex,
- &BlockDescriptions,
- &WriteChunkStats,
- &DiskStats,
- &WritePartsComplete,
- &TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&Work,
+ &WritePool,
+ &RemoteContent,
+ &RemoteLookup,
+ CacheFolderPath,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &BlockDescriptions,
+ &WriteChunkStats,
+ &DiskStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockBuffer);
- BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
- FilteredWrittenBytesPerSecond.Start();
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats,
- WriteChunkStats))
- {
- std::error_code DummyEc;
- std::filesystem::remove(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
- if (!BlockChunkPath.empty())
- {
- std::filesystem::remove(BlockChunkPath);
- }
+ if (!BlockChunkPath.empty())
+ {
+ std::filesystem::remove(BlockChunkPath);
+ }
- WritePartsComplete++;
+ WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
}
},
@@ -5840,27 +6127,28 @@ namespace {
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, PendingWork);
- ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load());
uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
DownloadStats.DownloadedBlockByteCount.load() +
+DownloadStats.DownloadedPartialBlockByteCount.load();
FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load());
FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
- std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
+ std::string DownloadRateString =
+ (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded. {}/{} ({}B/s) written.",
DownloadStats.RequestsCompleteCount.load(),
TotalRequestCount,
NiceBytes(DownloadedBytes),
- NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- WriteChunkStats.ChunkCountWritten.load(),
- ChunkCountToWrite,
+ DownloadRateString,
NiceBytes(DiskStats.WriteByteCount.load()),
+ NiceBytes(BytesToWrite),
NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
- WriteProgressBar.UpdateState(
- {.Task = "Writing chunks ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite),
- .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())},
- false);
+ WriteProgressBar.UpdateState({.Task = "Writing chunks ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(BytesToWrite),
+ .RemainingCount = gsl::narrow<uint64_t>(BytesToWrite - DiskStats.WriteByteCount.load())},
+ false);
});
}
@@ -5981,7 +6269,7 @@ namespace {
ZEN_TRACE_CPU("UpdateFolder_FinalizeTree");
Stopwatch Timer;
- WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst);
ProgressBar RebuildProgressBar(UsePlainProgress);
ParallellWork Work(AbortFlag);
@@ -6019,7 +6307,7 @@ namespace {
}
Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
+ WritePool,
[&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -6070,6 +6358,10 @@ namespace {
TargetsComplete++;
while (TargetOffset < (BaseTargetOffset + TargetCount))
{
+ if (AbortFlag)
+ {
+ break;
+ }
ZEN_TRACE_CPU("FinalizeTree_Copy");
ZEN_ASSERT(Targets[TargetOffset].first == RawHash);
@@ -6140,17 +6432,15 @@ namespace {
std::vector<std::pair<Oid, std::string>> Result;
{
Stopwatch GetBuildTimer;
-
- std::vector<std::pair<Oid, std::string>> AvailableParts;
-
- CbObject BuildObject = Storage.GetBuild(BuildId);
-
- ZEN_CONSOLE("GetBuild took {}. Name: '{}', Payload size: {}",
+ CbObject BuildObject = Storage.GetBuild(BuildId);
+ ZEN_CONSOLE("GetBuild took {}. Name: '{}' ({}, {}), Payload size: {}",
NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()),
- BuildObject["BuildName"sv].AsString(),
+ BuildObject["name"sv].AsString(),
+ BuildObject["type"sv].AsString(),
+ BuildObject["Configuration"sv].AsString(),
NiceBytes(BuildObject.GetSize()));
- ZEN_DEBUG("Build object: {}", BuildObject);
+ ZEN_CONSOLE_VERBOSE("Build object: {}", BuildObject);
CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
if (!PartsObject)
@@ -6160,6 +6450,8 @@ namespace {
OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize);
+ std::vector<std::pair<Oid, std::string>> AvailableParts;
+
for (CbFieldView PartView : PartsObject)
{
const std::string BuildPartName = std::string(PartView.GetName());
@@ -6221,7 +6513,7 @@ namespace {
return Result;
}
- ChunkedFolderContent GetRemoteContent(BuildStorage& Storage,
+ ChunkedFolderContent GetRemoteContent(StorageInstance& Storage,
const Oid& BuildId,
const std::vector<std::pair<Oid, std::string>>& BuildParts,
std::unique_ptr<ChunkingController>& OutChunkController,
@@ -6234,7 +6526,7 @@ namespace {
Stopwatch GetBuildPartTimer;
const Oid BuildPartId = BuildParts[0].first;
const std::string_view BuildPartName = BuildParts[0].second;
- CbObject BuildPartManifest = Storage.GetBuildPart(BuildId, BuildPartId);
+ CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId);
ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}",
BuildPartId,
BuildPartName,
@@ -6248,7 +6540,7 @@ namespace {
OutChunkController = CreateChunkingController(ChunkerName, Parameters);
}
- auto ParseBuildPartManifest = [](BuildStorage& Storage,
+ auto ParseBuildPartManifest = [](StorageInstance& Storage,
const Oid& BuildId,
const Oid& BuildPartId,
CbObject BuildPartManifest,
@@ -6274,12 +6566,103 @@ namespace {
// TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them
- Stopwatch GetBlockMetadataTimer;
- OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes);
- ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks",
- BuildPartId,
- NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
- OutBlockDescriptions.size());
+ {
+ Stopwatch GetBlockMetadataTimer;
+
+ std::vector<ChunkBlockDescription> UnorderedList;
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
+ if (Storage.BuildCacheStorage)
+ {
+ std::vector<CbObject> CacheBlockMetadatas = Storage.BuildCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes);
+ UnorderedList.reserve(CacheBlockMetadatas.size());
+ for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size();
+ CacheBlockMetadataIndex++)
+ {
+ const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex];
+ ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata);
+ if (Description.BlockHash == IoHash::Zero)
+ {
+ ZEN_WARN("Unexpected/invalid block metadata received from remote cache, skipping block");
+ }
+ else
+ {
+ UnorderedList.emplace_back(std::move(Description));
+ }
+ }
+ for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
+ {
+ const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
+ }
+ }
+
+ if (UnorderedList.size() < BlockRawHashes.size())
+ {
+ std::vector<IoHash> RemainingBlockHashes;
+ RemainingBlockHashes.reserve(BlockRawHashes.size() - UnorderedList.size());
+ for (const IoHash& BlockRawHash : BlockRawHashes)
+ {
+ if (!BlockDescriptionLookup.contains(BlockRawHash))
+ {
+ RemainingBlockHashes.push_back(BlockRawHash);
+ }
+ }
+ CbObject BlockMetadatas = Storage.BuildStorage->GetBlockMetadatas(BuildId, RemainingBlockHashes);
+ std::vector<ChunkBlockDescription> RemainingList;
+ {
+ CbArrayView BlocksArray = BlockMetadatas["blocks"sv].AsArrayView();
+ std::vector<IoHash> FoundBlockHashes;
+ std::vector<CbObject> FoundBlockMetadatas;
+ for (CbFieldView Block : BlocksArray)
+ {
+ ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView());
+
+ if (Description.BlockHash == IoHash::Zero)
+ {
+ ZEN_WARN("Unexpected/invalid block metadata received from remote store, skipping block");
+ }
+ else
+ {
+ if (Storage.BuildCacheStorage)
+ {
+ UniqueBuffer MetaBuffer = UniqueBuffer::Alloc(Block.GetSize());
+ Block.CopyTo(MetaBuffer.GetMutableView());
+ CbObject BlockMetadata(MetaBuffer.MoveToShared());
+
+ FoundBlockHashes.push_back(Description.BlockHash);
+ FoundBlockMetadatas.push_back(BlockMetadata);
+ }
+ RemainingList.emplace_back(std::move(Description));
+ }
+ }
+ if (Storage.BuildCacheStorage && !FoundBlockHashes.empty())
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, FoundBlockHashes, FoundBlockMetadatas);
+ }
+ }
+
+ for (size_t DescriptionIndex = 0; DescriptionIndex < RemainingList.size(); DescriptionIndex++)
+ {
+ const ChunkBlockDescription& Description = RemainingList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size() + DescriptionIndex);
+ }
+ UnorderedList.insert(UnorderedList.end(), RemainingList.begin(), RemainingList.end());
+ }
+
+ OutBlockDescriptions.reserve(BlockDescriptionLookup.size());
+ for (const IoHash& BlockHash : BlockRawHashes)
+ {
+ if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end())
+ {
+ OutBlockDescriptions.push_back(std::move(UnorderedList[It->second]));
+ }
+ }
+
+ ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks",
+ BuildPartId,
+ NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
+ OutBlockDescriptions.size());
+ }
if (OutBlockDescriptions.size() != BlockRawHashes.size())
{
@@ -6292,7 +6675,8 @@ namespace {
ZEN_CONSOLE("{} Attemping fallback options.", ErrorDescription);
std::vector<ChunkBlockDescription> AugmentedBlockDescriptions;
AugmentedBlockDescriptions.reserve(BlockRawHashes.size());
- std::vector<ChunkBlockDescription> FoundBlocks = Storage.FindBlocks(BuildId);
+ std::vector<ChunkBlockDescription> FoundBlocks =
+ ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId));
for (const IoHash& BlockHash : BlockRawHashes)
{
@@ -6315,7 +6699,7 @@ namespace {
}
else
{
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash);
+ IoBuffer BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockHash);
if (!BlockBuffer)
{
throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash));
@@ -6381,7 +6765,7 @@ namespace {
const Oid& OverlayBuildPartId = BuildParts[PartIndex].first;
const std::string& OverlayBuildPartName = BuildParts[PartIndex].second;
Stopwatch GetOverlayBuildPartTimer;
- CbObject OverlayBuildPartManifest = Storage.GetBuildPart(BuildId, OverlayBuildPartId);
+ CbObject OverlayBuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, OverlayBuildPartId);
ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}",
OverlayBuildPartId,
OverlayBuildPartName,
@@ -6486,9 +6870,11 @@ namespace {
Path,
std::move(IsAcceptedFolder),
std::move(IsAcceptedFile),
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
UsePlainProgress ? 5000 : 200,
- [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); },
+ [&](bool, std::ptrdiff_t) {
+ ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path);
+ },
AbortFlag);
if (AbortFlag)
{
@@ -6557,7 +6943,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
Path,
UpdatedContent,
ChunkController,
@@ -6609,8 +6995,6 @@ namespace {
{
LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths);
}
-
- ZEN_CONSOLE("Using cached local state");
}
ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs()));
ScanContent = false;
@@ -6636,7 +7020,7 @@ namespace {
FilteredBytesHashed.Start();
ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
ChunkingStats,
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
Path,
CurrentLocalFolderContent,
ChunkController,
@@ -6670,7 +7054,7 @@ namespace {
return LocalContent;
}
- void DownloadFolder(BuildStorage& Storage,
+ void DownloadFolder(StorageInstance& Storage,
const Oid& BuildId,
const std::vector<Oid>& BuildPartIds,
std::span<const std::string> BuildPartNames,
@@ -6694,7 +7078,7 @@ namespace {
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
std::vector<std::pair<Oid, std::string>> AllBuildParts =
- ResolveBuildPartNames(Storage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
+ ResolveBuildPartNames(*Storage.BuildStorage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
std::vector<ChunkedFolderContent> PartContents;
@@ -6786,7 +7170,7 @@ namespace {
{
BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
}
- ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView());
+ ZEN_CONSOLE("Downloading build {}, parts:{} to '{}'", BuildId, BuildPartString.ToView(), Path);
FolderContent LocalFolderState;
DiskStatistics DiskStats;
@@ -7131,6 +7515,10 @@ BuildsCommand::BuildsCommand()
"<jsonmetadata>");
};
+ auto AddCacheOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>");
+ };
+
auto AddOutputOptions = [this](cxxopts::Options& Ops) {
Ops.add_option("output", "", "plain-progress", "Show progress using plain output", cxxopts::value(m_PlainProgress), "<progress>");
Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>");
@@ -7169,6 +7557,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_UploadOptions);
AddFileOptions(m_UploadOptions);
AddOutputOptions(m_UploadOptions);
+ AddCacheOptions(m_UploadOptions);
m_UploadOptions.add_options()("h,help", "Print help");
m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_UploadOptions.add_option("",
@@ -7232,6 +7621,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_DownloadOptions);
AddFileOptions(m_DownloadOptions);
AddOutputOptions(m_DownloadOptions);
+ AddCacheOptions(m_DownloadOptions);
m_DownloadOptions.add_options()("h,help", "Print help");
m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
@@ -7284,6 +7674,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_TestOptions);
AddFileOptions(m_TestOptions);
AddOutputOptions(m_TestOptions);
+ AddCacheOptions(m_TestOptions);
m_TestOptions.add_options()("h,help", "Print help");
m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_TestOptions.add_option("",
@@ -7304,6 +7695,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_FetchBlobOptions);
AddFileOptions(m_FetchBlobOptions);
AddOutputOptions(m_FetchBlobOptions);
+ AddCacheOptions(m_FetchBlobOptions);
m_FetchBlobOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
m_FetchBlobOptions
.add_option("", "", "blob-hash", "IoHash in hex form identifying the blob to download", cxxopts::value(m_BlobHash), "<blob-hash>");
@@ -7333,6 +7725,7 @@ BuildsCommand::BuildsCommand()
AddCloudOptions(m_MultiTestDownloadOptions);
AddFileOptions(m_MultiTestDownloadOptions);
AddOutputOptions(m_MultiTestDownloadOptions);
+ AddCacheOptions(m_MultiTestDownloadOptions);
m_MultiTestDownloadOptions
.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>");
@@ -7385,6 +7778,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help()));
}
}
+ else if (m_StoragePath.empty())
+ {
+ throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help()));
+ }
};
std::unique_ptr<AuthMgr> Auth;
@@ -7393,7 +7790,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
auto CreateAuthMgr = [&]() {
if (!Auth)
{
- std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir);
+ std::filesystem::path DataRoot = m_SystemRootDir.empty()
+ ? PickDefaultSystemRootDirectory()
+ : std::filesystem::absolute(StringToPath(m_SystemRootDir)).make_preferred();
if (m_EncryptionKey.empty())
{
@@ -7448,7 +7847,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else if (!m_AccessTokenPath.empty())
{
- std::string ResolvedAccessToken = ReadAccessTokenFromFile(StringToPath(m_AccessTokenPath));
+ std::string ResolvedAccessToken =
+ ReadAccessTokenFromFile(std::filesystem::absolute(StringToPath(m_AccessTokenPath)).make_preferred());
if (!ResolvedAccessToken.empty())
{
ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken);
@@ -7486,15 +7886,63 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
};
ParseOutputOptions();
+ auto CreateBuildStorage = [&](BuildStorage::Statistics& StorageStats,
+ BuildStorageCache::Statistics& StorageCacheStats,
+ const std::filesystem::path& TempPath) -> StorageInstance {
+ ParseStorageOptions();
+
+ StorageInstance Result;
+
+ if (!m_BuildsUrl.empty())
+ {
+ ParseAuthOptions();
+ Result.BuildStorageHttp = std::make_unique<HttpClient>(m_BuildsUrl, ClientSettings);
+ ZEN_CONSOLE("Using cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
+ m_BuildsUrl,
+ Result.BuildStorageHttp->GetSessionId(),
+ m_Namespace,
+ m_Bucket);
+ Result.BuildStorage =
+ CreateJupiterBuildStorage(Log(), *Result.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, TempPath / "storage");
+ Result.StorageName = ZEN_CLOUD_STORAGE;
+ }
+ else if (!m_StoragePath.empty())
+ {
+ std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred();
+ ZEN_CONSOLE("Using folder {}", StoragePath);
+ Result.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
+ Result.StorageName = fmt::format("Disk {}", StoragePath.stem());
+ }
+ else
+ {
+ throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
+ }
+ if (!m_ZenCacheHost.empty())
+ {
+ Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost,
+ HttpClientSettings{.LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{3000},
+ .Timeout = std::chrono::milliseconds{30000},
+ .AssumeHttp2 = false,
+ .AllowResume = true,
+ .RetryCount = 0});
+ if (Result.CacheHttp->Get("/health").IsSuccess())
+ {
+ Result.BuildCacheStorage =
+ CreateZenBuildStorageCache(*Result.CacheHttp, StorageCacheStats, m_Namespace, m_Bucket, TempPath / "zencache");
+ }
+ else
+ {
+ Result.CacheHttp.reset();
+ }
+ }
+ return Result;
+ };
+
try
{
if (SubOption == &m_ListOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
CbObject QueryObject;
if (m_ListQueryPath.empty())
{
@@ -7505,7 +7953,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
- std::filesystem::path ListQueryPath = StringToPath(m_ListQueryPath);
+ std::filesystem::path ListQueryPath = std::filesystem::absolute(StringToPath(m_ListQueryPath)).make_preferred();
if (ToLower(ListQueryPath.extension().string()) == ".cbo")
{
QueryObject = LoadCompactBinaryObject(IoBufferBuilder::MakeFromFile(ListQueryPath));
@@ -7525,28 +7973,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE_VERBOSE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{});
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE_VERBOSE("Querying builds in folder '{}'.", StoragePath);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
- CbObject Response = Storage->ListBuilds(QueryObject);
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderName);
+
+ CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject);
ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None);
if (m_ListResultPath.empty())
{
@@ -7556,7 +7987,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
- std::filesystem::path ListResultPath = StringToPath(m_ListResultPath);
+ std::filesystem::path ListResultPath = std::filesystem::absolute(StringToPath(m_ListResultPath)).make_preferred();
if (ToLower(ListResultPath.extension().string()) == ".cbo")
{
MemoryView ResponseView = Response.GetView();
@@ -7575,11 +8006,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_UploadOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_Path.empty())
{
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help()));
@@ -7610,7 +8036,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path));
if (m_BuildPartName.empty())
{
@@ -7645,48 +8071,20 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help()));
}
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
+ const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
+
BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'",
- m_BuildPartName,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- GeneratedBuildId ? "Generated " : "",
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'",
- m_BuildPartName,
- Path,
- StoragePath,
- GeneratedBuildId ? "Generated " : "",
- BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
CbObject MetaData;
if (m_CreateBuild)
{
if (!m_BuildMetadataPath.empty())
{
- std::filesystem::path MetadataPath = StringToPath(m_BuildMetadataPath);
+ std::filesystem::path MetadataPath = std::filesystem::absolute(StringToPath(m_BuildMetadataPath));
IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten();
std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize());
std::string JsonError;
@@ -7713,12 +8111,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId,
BuildPartId,
m_BuildPartName,
Path,
- StringToPath(m_ManifestPath),
+ std::filesystem::absolute(StringToPath(m_ManifestPath)).make_preferred(),
m_BlockReuseMinPercentLimit,
m_AllowMultiparts,
MetaData,
@@ -7735,7 +8133,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
"Requests: {}\n"
"Avg Request Time: {}\n"
"Avg I/O Time: {}",
- StorageName,
+ Storage.StorageName,
NiceBytes(StorageStats.TotalBytesRead.load()),
NiceBytes(StorageStats.TotalBytesWritten.load()),
StorageStats.TotalRequestCount.load(),
@@ -7751,11 +8149,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_DownloadOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_Path.empty())
{
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
@@ -7786,37 +8179,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- BuildId,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, Path, StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId,
BuildPartIds,
m_BuildPartNames,
@@ -7835,7 +8205,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
"Requests: {}\n"
"Avg Request Time: {}\n"
"Avg I/O Time: {}",
- StorageName,
+ Storage.StorageName,
NiceBytes(StorageStats.TotalBytesRead.load()),
NiceBytes(StorageStats.TotalBytesWritten.load()),
StorageStats.TotalRequestCount.load(),
@@ -7859,8 +8229,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help()));
}
- std::filesystem::path Path = StringToPath(m_Path);
- std::filesystem::path DiffPath = StringToPath(m_DiffPath);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
+ std::filesystem::path DiffPath = std::filesystem::absolute(StringToPath(m_DiffPath)).make_preferred();
DiffFolders(Path, DiffPath, m_OnlyChunked);
return AbortFlag ? 11 : 0;
}
@@ -7872,10 +8242,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
}
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
// m_StoragePath = "D:\\buildstorage";
// m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient";
// std::vector<std::string> BuildIdStrings{"07d3942f0e7f4ca1b13b0587",
@@ -7886,34 +8252,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
// "07d3964f919d577a321a1fdd",
// "07d396a6ce875004e16b9528"};
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Downloading {} to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'",
- FormatArray<std::string>(m_BuildIds, " "sv),
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Downloading {}'to '{}' from folder {}", FormatArray<std::string>(m_BuildIds, " "sv), Path, StoragePath);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
Stopwatch Timer;
for (const std::string& BuildIdString : m_BuildIds)
@@ -7923,7 +8267,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help()));
}
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId,
{},
{},
@@ -7945,36 +8289,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_TestOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_Path.empty())
{
throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help()));
}
- std::filesystem::path Path = StringToPath(m_Path);
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
m_BuildId = Oid::NewOid().ToString();
m_BuildPartName = Path.filename().string();
m_BuildPartId = Oid::NewOid().ToString();
m_CreateBuild = true;
- BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
+ const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
+ std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred();
if (m_BuildsUrl.empty() && StoragePath.empty())
{
StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred();
CreateDirectories(StoragePath);
CleanDirectory(StoragePath, {});
+ m_StoragePath = StoragePath.generic_string();
}
auto _ = MakeGuard([&]() {
if (m_BuildsUrl.empty() && StoragePath.empty())
@@ -7983,33 +8320,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
});
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- Path,
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!StoragePath.empty())
- {
- ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'",
- m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName,
- Path,
- StoragePath,
- BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ BuildStorage::Statistics StorageStats;
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
auto MakeMetaData = [](const Oid& BuildId) -> CbObject {
CbObjectWriter BuildMetaDataWriter;
@@ -8032,7 +8346,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView());
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId,
BuildPartId,
m_BuildPartName,
@@ -8052,7 +8366,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
+ DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
if (AbortFlag)
{
ZEN_CONSOLE("Download failed.");
@@ -8064,7 +8378,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (identical target)");
@@ -8115,7 +8429,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SourceSize > 256)
{
Work.ScheduleWork(
- GetMediumWorkerPool(EWorkloadType::Burst),
+ SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst),
[SourceSize, FilePath](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -8168,7 +8482,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (scrambled target)");
@@ -8187,7 +8501,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView());
}
- UploadFolder(*Storage,
+ UploadFolder(Storage,
BuildId2,
BuildPartId2,
m_BuildPartName,
@@ -8206,7 +8520,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
+ DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -8214,7 +8528,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId2,
{BuildPartId2},
{},
@@ -8230,7 +8544,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage,
+ DownloadFolder(Storage,
BuildId2,
{BuildPartId2},
{},
@@ -8250,11 +8564,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_FetchBlobOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
-
if (m_BlobHash.empty())
{
throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help()));
@@ -8266,44 +8575,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help()));
}
- if (m_BuildsUrl.empty() && m_StoragePath.empty())
- {
- throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help()));
- }
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
- BuildStorage::Statistics StorageStats;
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
- std::filesystem::path Path = StringToPath(m_Path);
+ BuildStorage::Statistics StorageStats;
+ BuildStorageCache::Statistics StorageCacheStats;
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
uint64_t CompressedSize;
uint64_t DecompressedSize;
- ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize);
+ ValidateBlob(*Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize);
if (AbortFlag)
{
return 11;
@@ -8317,10 +8600,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_ValidateBuildPartOptions)
{
- ParseStorageOptions();
- ParseAuthOptions();
-
- HttpClient Http(m_BuildsUrl, ClientSettings);
+ // HttpClient Http(m_BuildsUrl, ClientSettings);
if (m_BuildsUrl.empty() && m_StoragePath.empty())
{
@@ -8342,39 +8622,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help()));
}
+ std::filesystem::path Path = std::filesystem::absolute(StringToPath(m_Path)).make_preferred();
+
BuildStorage::Statistics StorageStats;
- std::unique_ptr<BuildStorage> Storage;
- std::string StorageName;
+ BuildStorageCache::Statistics StorageCacheStats;
- std::filesystem::path Path = StringToPath(m_Path);
+ StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, Path / ZenTempFolderName);
- if (!m_BuildsUrl.empty())
- {
- ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'",
- m_BuildsUrl,
- Http.GetSessionId(),
- m_Namespace,
- m_Bucket,
- BuildId);
- Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName);
- StorageName = "Cloud DDC";
- }
- else if (!m_StoragePath.empty())
- {
- std::filesystem::path StoragePath = StringToPath(m_StoragePath);
- ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId);
- Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- StorageName = fmt::format("Disk {}", StoragePath.stem());
- }
- else
- {
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
- }
Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId);
ValidateStatistics ValidateStats;
DownloadStatistics DownloadStats;
- ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
+ ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats);
return AbortFlag ? 13 : 0;
}
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index 1634975c1..b5af236e1 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -40,6 +40,9 @@ private:
std::string m_StoragePath;
bool m_WriteMetadataAsJson = false;
+ // cache
+ std::string m_ZenCacheHost;
+
std::string m_BuildId;
bool m_CreateBuild = false;
std::string m_BuildMetadataPath;
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 30711a432..fe5232d89 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -45,6 +45,7 @@ namespace detail {
TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {}
~TempPayloadFile()
{
+ ZEN_TRACE_CPU("TempPayloadFile::Close");
try
{
if (m_FileHandle)
@@ -87,6 +88,7 @@ namespace detail {
std::error_code Open(const std::filesystem::path& TempFolderPath)
{
+ ZEN_TRACE_CPU("TempPayloadFile::Open");
ZEN_ASSERT(m_FileHandle == nullptr);
std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) |
@@ -131,6 +133,7 @@ namespace detail {
std::error_code Write(std::string_view DataString)
{
+ ZEN_TRACE_CPU("TempPayloadFile::Write");
const uint8_t* DataPtr = (const uint8_t*)DataString.data();
size_t DataSize = DataString.size();
if (DataSize >= CacheBufferSize)
@@ -165,6 +168,7 @@ namespace detail {
IoBuffer DetachToIoBuffer()
{
+ ZEN_TRACE_CPU("TempPayloadFile::DetachToIoBuffer");
if (std::error_code Ec = Flush(); Ec)
{
ThrowSystemError(Ec.value(), Ec.message());
@@ -180,6 +184,7 @@ namespace detail {
IoBuffer BorrowIoBuffer()
{
+ ZEN_TRACE_CPU("TempPayloadFile::BorrowIoBuffer");
if (std::error_code Ec = Flush(); Ec)
{
ThrowSystemError(Ec.value(), Ec.message());
@@ -193,6 +198,7 @@ namespace detail {
uint64_t GetSize() const { return m_WriteOffset; }
void ResetWritePos(uint64_t WriteOffset)
{
+ ZEN_TRACE_CPU("TempPayloadFile::ResetWritePos");
Flush();
m_WriteOffset = WriteOffset;
}
@@ -200,6 +206,7 @@ namespace detail {
private:
std::error_code Flush()
{
+ ZEN_TRACE_CPU("TempPayloadFile::Flush");
if (m_CacheBufferOffset == 0)
{
return {};
@@ -211,6 +218,7 @@ namespace detail {
std::error_code AppendData(const void* Data, uint64_t Size)
{
+ ZEN_TRACE_CPU("TempPayloadFile::AppendData");
ZEN_ASSERT(m_FileHandle != nullptr);
const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
@@ -314,7 +322,11 @@ CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffe
const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code);
if (HttpResponse.error)
{
- ZEN_WARN("HttpClient client error (session: {}): {}", SessionId, HttpResponse);
+ if (HttpResponse.error.code != cpr::ErrorCode::OPERATION_TIMEDOUT &&
+ HttpResponse.error.code != cpr::ErrorCode::CONNECTION_FAILURE && HttpResponse.error.code != cpr::ErrorCode::REQUEST_CANCELLED)
+ {
+ ZEN_WARN("HttpClient client error (session: {}): {}", SessionId, HttpResponse);
+ }
// Client side failure code
return HttpClient::Response{
@@ -376,6 +388,7 @@ ShouldRetry(const cpr::Response& Response)
static bool
ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile>& PayloadFile)
{
+ ZEN_TRACE_CPU("ValidatePayload");
IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile) ? PayloadFile->BorrowIoBuffer()
: IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size());
@@ -535,12 +548,14 @@ struct HttpClient::Impl : public RefCounted
inline cpr::Session* operator->() const { return CprSession; }
inline cpr::Response Get()
{
+ ZEN_TRACE_CPU("HttpClient::Impl::Get");
cpr::Response Result = CprSession->Get();
ZEN_TRACE("GET {}", Result);
return Result;
}
inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {})
{
+ ZEN_TRACE_CPU("HttpClient::Impl::Download");
if (Header)
{
CprSession->SetHeaderCallback(std::move(Header.value()));
@@ -553,12 +568,14 @@ struct HttpClient::Impl : public RefCounted
}
inline cpr::Response Head()
{
+ ZEN_TRACE_CPU("HttpClient::Impl::Head");
cpr::Response Result = CprSession->Head();
ZEN_TRACE("HEAD {}", Result);
return Result;
}
inline cpr::Response Put(std::optional<cpr::ReadCallback>&& Read = {})
{
+ ZEN_TRACE_CPU("HttpClient::Impl::Put");
if (Read)
{
CprSession->SetReadCallback(std::move(Read.value()));
@@ -570,6 +587,7 @@ struct HttpClient::Impl : public RefCounted
}
inline cpr::Response Post(std::optional<cpr::ReadCallback>&& Read = {})
{
+ ZEN_TRACE_CPU("HttpClient::Impl::Post");
if (Read)
{
CprSession->SetReadCallback(std::move(Read.value()));
@@ -581,6 +599,7 @@ struct HttpClient::Impl : public RefCounted
}
inline cpr::Response Delete()
{
+ ZEN_TRACE_CPU("HttpClient::Impl::Delete");
cpr::Response Result = CprSession->Delete();
ZEN_TRACE("DELETE {}", Result);
return Result;
@@ -620,6 +639,7 @@ HttpClient::Impl::Impl(LoggerRef Log) : m_Log(Log)
HttpClient::Impl::~Impl()
{
+ ZEN_TRACE_CPU("HttpClient::Impl::~Impl");
m_SessionLock.WithExclusiveLock([&] {
for (auto CprSession : m_Sessions)
{
@@ -638,6 +658,7 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
const std::string_view SessionId,
std::optional<HttpClientAccessToken> AccessToken)
{
+ ZEN_TRACE_CPU("HttpClient::Impl::AllocSession");
cpr::Session* CprSession = nullptr;
m_SessionLock.WithExclusiveLock([&] {
if (!m_Sessions.empty())
@@ -694,6 +715,7 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
void
HttpClient::Impl::ReleaseSession(cpr::Session* CprSession)
{
+ ZEN_TRACE_CPU("HttpClient::Impl::ReleaseSession");
CprSession->SetUrl({});
CprSession->SetHeader({});
CprSession->SetBody({});
@@ -718,6 +740,7 @@ HttpClient::~HttpClient()
bool
HttpClient::Authenticate()
{
+ ZEN_TRACE_CPU("HttpClient::Authenticate");
std::optional<HttpClientAccessToken> Token = GetAccessToken();
if (!Token)
{
@@ -729,6 +752,7 @@ HttpClient::Authenticate()
const std::optional<HttpClientAccessToken>
HttpClient::GetAccessToken()
{
+ ZEN_TRACE_CPU("HttpClient::GetAccessToken");
if (!m_ConnectionSettings.AccessTokenProvider.has_value())
{
return {};
diff --git a/src/zenhttp/include/zenhttp/formatters.h b/src/zenhttp/include/zenhttp/formatters.h
index 74da9ab05..05a23d675 100644
--- a/src/zenhttp/include/zenhttp/formatters.h
+++ b/src/zenhttp/include/zenhttp/formatters.h
@@ -73,9 +73,11 @@ struct fmt::formatter<cpr::Response>
if (zen::IsHttpSuccessCode(Response.status_code))
{
return fmt::format_to(Ctx.out(),
- "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}",
+ "Url: {}, Status: {}, Error: '{}' ({}), Bytes: {}/{} (Up/Down), Elapsed: {}",
Response.url.str(),
Response.status_code,
+ Response.error.message,
+ int(Response.error.code),
Response.uploaded_bytes,
Response.downloaded_bytes,
NiceResponseTime.c_str());
@@ -92,29 +94,35 @@ struct fmt::formatter<cpr::Response>
zen::ExtendableStringBuilder<256> Sb;
std::string_view Json = Obj.ToJson(Sb).ToView();
- return fmt::format_to(Ctx.out(),
- "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'",
- Response.url.str(),
- Response.status_code,
- Response.uploaded_bytes,
- Response.downloaded_bytes,
- NiceResponseTime.c_str(),
- Json,
- Response.reason);
+ return fmt::format_to(
+ Ctx.out(),
+ "Url: {}, Status: {}, Error: '{}' ({}). Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'",
+ Response.url.str(),
+ Response.status_code,
+ Response.error.message,
+ int(Response.error.code),
+ Response.uploaded_bytes,
+ Response.downloaded_bytes,
+ NiceResponseTime.c_str(),
+ Json,
+ Response.reason);
}
else
{
zen::BodyLogFormatter Body(Response.text);
- return fmt::format_to(Ctx.out(),
- "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'",
- Response.url.str(),
- Response.status_code,
- Response.uploaded_bytes,
- Response.downloaded_bytes,
- NiceResponseTime.c_str(),
- Body.GetText(),
- Response.reason);
+ return fmt::format_to(
+ Ctx.out(),
+ "Url: {}, Status: {}, Error: '{}' ({}), Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'",
+ Response.url.str(),
+ Response.status_code,
+ Response.error.message,
+ int(Response.error.code),
+ Response.uploaded_bytes,
+ Response.downloaded_bytes,
+ NiceResponseTime.c_str(),
+ Body.GetText(),
+ Response.reason);
}
}
}
diff --git a/src/zenhttp/packageformat.cpp b/src/zenhttp/packageformat.cpp
index ae80851e4..9d423ecbc 100644
--- a/src/zenhttp/packageformat.cpp
+++ b/src/zenhttp/packageformat.cpp
@@ -279,11 +279,10 @@ FormatPackageMessageInternal(const CbPackage& Data, FormatFlags Flags, void* Tar
{
IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer();
ZEN_ASSERT(ObjIoBuffer.GetSize() > 0);
- ResponseBuffers.emplace_back(std::move(ObjIoBuffer));
-
*AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(),
.Flags = CbAttachmentEntry::kIsObject,
.AttachmentHash = Attachment.GetHash()};
+ ResponseBuffers.emplace_back(std::move(ObjIoBuffer));
}
else if (const CompositeBuffer& AttachmentBinary = Attachment.AsCompositeBinary())
{
@@ -500,30 +499,25 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
{
if (Entry.Flags & CbAttachmentEntry::kIsObject)
{
+ CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer)));
+ if (!CompBuf)
+ {
+ // First payload is always a compact binary object
+ MalformedAttachments.push_back(
+ std::make_pair(i,
+ fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}",
+ AttachmentBuffer.GetSize(),
+ Entry.AttachmentHash)));
+ }
+ CbObject AttachmentObject = LoadCompactBinaryObject(std::move(CompBuf));
if (i == 0)
{
- CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer)));
- if (CompBuf)
- {
- Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf)));
- }
- else
- {
- // First payload is always a compact binary object
- MalformedAttachments.push_back(
- std::make_pair(i,
- fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}",
- AttachmentBuffer.GetSize(),
- Entry.AttachmentHash)));
- }
+ // First payload is always a compact binary object
+ Package.SetObject(AttachmentObject);
}
else
{
- MalformedAttachments.push_back(std::make_pair(
- i,
- fmt::format("Invalid format, compressed object attachments are not currently supported (size {}) for {}",
- AttachmentBuffer.GetSize(),
- Entry.AttachmentHash)));
+ Attachments.emplace_back(CbAttachment(AttachmentObject, Entry.AttachmentHash));
}
}
else
@@ -547,17 +541,14 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
{
if (Entry.Flags & CbAttachmentEntry::kIsObject)
{
+ CbObject AttachmentObject = LoadCompactBinaryObject(AttachmentBuffer);
if (i == 0)
{
- Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer));
+ Package.SetObject(AttachmentObject);
}
else
{
- MalformedAttachments.push_back(
- std::make_pair(i,
- fmt::format("Invalid format, object attachments are not currently supported (size {}) for {}",
- AttachmentBuffer.GetSize(),
- Entry.AttachmentHash)));
+ Attachments.emplace_back(CbAttachment(AttachmentObject, Entry.AttachmentHash));
}
}
else if (AttachmentSize > 0)
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 2888f5450..0da6e31ad 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -20,6 +20,7 @@
#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
+#include <zenstore/buildstore/buildstore.h>
#include <zenstore/cache/structuredcachestore.h>
#include <zenutil/workerpools.h>
#include "config.h"
@@ -105,6 +106,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
ZenCacheStore* CacheStore,
CidStore* CidStore,
ProjectStore* ProjectStore,
+ BuildStore* BuildStore,
const LogPaths& LogPaths,
const ZenServerOptions& ServerOptions)
: m_GcScheduler(Scheduler)
@@ -112,6 +114,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
, m_ProjectStore(ProjectStore)
+, m_BuildStore(BuildStore)
, m_LogPaths(LogPaths)
, m_ServerOptions(ServerOptions)
{
@@ -306,6 +309,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
Response << "Interval" << ToTimeSpan(State.Config.Interval);
Response << "MaxCacheDuration" << ToTimeSpan(State.Config.MaxCacheDuration);
Response << "MaxProjectStoreDuration" << ToTimeSpan(State.Config.MaxProjectStoreDuration);
+ Response << "MaxBuildStoreDuration" << ToTimeSpan(State.Config.MaxBuildStoreDuration);
Response << "CollectSmallObjects" << State.Config.CollectSmallObjects;
Response << "Enabled" << State.Config.Enabled;
Response << "DiskReserveSize" << NiceBytes(State.Config.DiskReserveSize);
@@ -401,6 +405,14 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
}
}
+ if (auto Param = Params.GetValue("maxbuildstoreduration"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<uint64_t>(Param))
+ {
+ GcParams.MaxBuildStoreDuration = std::chrono::seconds(Value.value());
+ }
+ }
+
if (auto Param = Params.GetValue("disksizesoftlimit"); Param.empty() == false)
{
if (auto Value = ParseInt<uint64_t>(Param))
@@ -782,6 +794,10 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
{
m_ProjectStore->Flush();
}
+ if (m_BuildStore)
+ {
+ m_BuildStore->Flush();
+ }
HttpReq.WriteResponse(HttpResponseCode::OK);
},
HttpVerb::kPost);
diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h
index 563c4f536..e7821dead 100644
--- a/src/zenserver/admin/admin.h
+++ b/src/zenserver/admin/admin.h
@@ -12,6 +12,7 @@ class JobQueue;
class ZenCacheStore;
class CidStore;
class ProjectStore;
+class BuildStore;
struct ZenServerOptions;
class HttpAdminService : public zen::HttpService
@@ -28,6 +29,7 @@ public:
ZenCacheStore* CacheStore,
CidStore* CidStore,
ProjectStore* ProjectStore,
+ BuildStore* BuildStore,
const LogPaths& LogPaths,
const ZenServerOptions& ServerOptions);
~HttpAdminService();
@@ -42,6 +44,7 @@ private:
ZenCacheStore* m_CacheStore;
CidStore* m_CidStore;
ProjectStore* m_ProjectStore;
+ BuildStore* m_BuildStore;
LogPaths m_LogPaths;
const ZenServerOptions& m_ServerOptions;
};
diff --git a/src/zenserver/buildstore/httpbuildstore.cpp b/src/zenserver/buildstore/httpbuildstore.cpp
new file mode 100644
index 000000000..06bfea423
--- /dev/null
+++ b/src/zenserver/buildstore/httpbuildstore.cpp
@@ -0,0 +1,526 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpbuildstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/trace.h>
+#include <zenhttp/packageformat.h>
+#include <zenstore/buildstore/buildstore.h>
+#include <zenutil/workerpools.h>
+
+#include <numeric>
+
+namespace zen {
+using namespace std::literals;
+
+ZEN_DEFINE_LOG_CATEGORY_STATIC(LogBuilds, "builds"sv);
+
+HttpBuildStoreService::HttpBuildStoreService(HttpStatsService& StatsService, BuildStore& Store)
+: m_Log(logging::Get("builds"))
+, m_StatsService(StatsService)
+, m_BuildStore(Store)
+{
+ Initialize();
+}
+
+HttpBuildStoreService::~HttpBuildStoreService()
+{
+}
+
+const char*
+HttpBuildStoreService::BaseUri() const
+{
+ return "/builds/";
+}
+
+void
+HttpBuildStoreService::Initialize()
+{
+ ZEN_LOG_INFO(LogBuilds, "Initializing Builds Service");
+
+ m_StatsService.RegisterHandler("builds", *this);
+
+ m_Router.AddPattern("namespace", "([[:alnum:]-_.]+)");
+ m_Router.AddPattern("bucket", "([[:alnum:]-_.]+)");
+ m_Router.AddPattern("buildid", "([[:xdigit:]]{24})");
+ m_Router.AddPattern("hash", "([[:xdigit:]]{40})");
+
+ m_Router.RegisterRoute(
+ "{namespace}/{bucket}/{buildid}/blobs/{hash}",
+ [this](HttpRouterRequest& Req) { PutBlobRequest(Req); },
+ HttpVerb::kPut);
+
+ m_Router.RegisterRoute(
+ "{namespace}/{bucket}/{buildid}/blobs/{hash}",
+ [this](HttpRouterRequest& Req) { GetBlobRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{namespace}/{bucket}/{buildid}/blobs/putBlobMetadata",
+ [this](HttpRouterRequest& Req) { PutMetadataRequest(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{namespace}/{bucket}/{buildid}/blobs/getBlobMetadata",
+ [this](HttpRouterRequest& Req) { GetMetadatasRequest(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{namespace}/{bucket}/{buildid}/blobs/exists",
+ [this](HttpRouterRequest& Req) { BlobsExistsRequest(Req); },
+ HttpVerb::kPost);
+}
+
+void
+HttpBuildStoreService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("HttpBuildStoreService::HandleRequest");
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
+ m_BuildStoreStats.RequestCount++;
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_LOG_WARN(LogBuilds, "No route found for {0}", Request.RelativeUri());
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv);
+ }
+}
+
+void
+HttpBuildStoreService::PutBlobRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("HttpBuildStoreService::PutBlobRequest");
+ HttpServerRequest& ServerRequest = Req.ServerRequest();
+ const std::string_view Namespace = Req.GetCapture(1);
+ const std::string_view Bucket = Req.GetCapture(2);
+ const std::string_view BuildId = Req.GetCapture(3);
+ const std::string_view Hash = Req.GetCapture(4);
+ ZEN_UNUSED(Namespace, Bucket, BuildId);
+ IoHash BlobHash;
+ if (!IoHash::TryParse(Hash, BlobHash))
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Invalid blob hash '{}'", Hash));
+ }
+ m_BuildStoreStats.BlobWriteCount++;
+ IoBuffer Payload = ServerRequest.ReadPayload();
+ if (!Payload)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Payload blob {} is empty", Hash));
+ }
+ if (Payload.GetContentType() != HttpContentType::kCompressedBinary)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Payload blob {} content type {} is invalid", Hash, ToString(Payload.GetContentType())));
+ }
+ m_BuildStore.PutBlob(BlobHash, ServerRequest.ReadPayload());
+ // ZEN_INFO("Stored blob {}. Size: {}", BlobHash, ServerRequest.ReadPayload().GetSize());
+ return ServerRequest.WriteResponse(HttpResponseCode::OK);
+}
+
+void
+HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("HttpBuildStoreService::GetBlobRequest");
+ HttpServerRequest& ServerRequest = Req.ServerRequest();
+ std::string_view Namespace = Req.GetCapture(1);
+ std::string_view Bucket = Req.GetCapture(2);
+ std::string_view BuildId = Req.GetCapture(3);
+ std::string_view Hash = Req.GetCapture(4);
+ ZEN_UNUSED(Namespace, Bucket, BuildId);
+ IoHash BlobHash;
+ if (!IoHash::TryParse(Hash, BlobHash))
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Invalid blob hash '{}'", Hash));
+ }
+ zen::HttpRanges Ranges;
+ bool HasRange = ServerRequest.TryGetRanges(Ranges);
+ if (Ranges.size() > 1)
+ {
+ // Only a single range is supported
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Multiple ranges in blob request is not supported");
+ }
+
+ m_BuildStoreStats.BlobReadCount++;
+ IoBuffer Blob = m_BuildStore.GetBlob(BlobHash);
+ if (!Blob)
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Blob with hash '{}' could not be found", Hash));
+ }
+ // ZEN_INFO("Fetched blob {}. Size: {}", BlobHash, Blob.GetSize());
+ m_BuildStoreStats.BlobHitCount++;
+ if (HasRange)
+ {
+ const HttpRange& Range = Ranges.front();
+ const uint64_t BlobSize = Blob.GetSize();
+ const uint64_t MaxBlobSize = Range.Start < BlobSize ? Range.Start - BlobSize : 0;
+ const uint64_t RangeSize = Min(Range.End - Range.Start + 1, MaxBlobSize);
+ if (Range.Start + RangeSize >= BlobSize)
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::NoContent);
+ }
+ Blob = IoBuffer(Blob, Range.Start, RangeSize);
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, ZenContentType::kBinary, Blob);
+ }
+ else
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, Blob.GetContentType(), Blob);
+ }
+}
+
+void
+HttpBuildStoreService::PutMetadataRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("HttpBuildStoreService::PutMetadataRequest");
+ HttpServerRequest& ServerRequest = Req.ServerRequest();
+ std::string_view Namespace = Req.GetCapture(1);
+ std::string_view Bucket = Req.GetCapture(2);
+ std::string_view BuildId = Req.GetCapture(3);
+
+ IoBuffer MetaPayload = ServerRequest.ReadPayload();
+ if (MetaPayload.GetContentType() != ZenContentType::kCbPackage)
+ {
+ throw std::runtime_error(fmt::format("PutMetadataRequest payload has unexpected payload type '{}', expected '{}'",
+ ToString(MetaPayload.GetContentType()),
+ ToString(ZenContentType::kCbPackage)));
+ }
+ CbPackage Message = ParsePackageMessage(MetaPayload);
+
+ CbObjectView MessageObject = Message.GetObject();
+ if (!MessageObject)
+ {
+ throw std::runtime_error("PutMetadataRequest payload object is missing");
+ }
+ CbArrayView BlobsArray = MessageObject["blobHashes"sv].AsArrayView();
+ CbArrayView MetadataArray = MessageObject["metadatas"sv].AsArrayView();
+
+ const uint64_t BlobCount = BlobsArray.Num();
+ if (BlobCount == 0)
+ {
+ throw std::runtime_error("PutMetadataRequest blobs array is empty");
+ }
+ if (BlobCount != MetadataArray.Num())
+ {
+ throw std::runtime_error(
+ fmt::format("PutMetadataRequest metadata array size {} does not match blobs array size {}", MetadataArray.Num(), BlobCount));
+ }
+
+ std::vector<IoHash> BlobHashes;
+ std::vector<IoBuffer> MetadataPayloads;
+
+ BlobHashes.reserve(BlobCount);
+ MetadataPayloads.reserve(BlobCount);
+
+ auto BlobsArrayIt = begin(BlobsArray);
+ auto MetadataArrayIt = begin(MetadataArray);
+ while (BlobsArrayIt != end(BlobsArray))
+ {
+ const IoHash BlobHash = (*BlobsArrayIt).AsHash();
+ const IoHash MetadataHash = (*MetadataArrayIt).AsAttachment();
+
+ const CbAttachment* Attachment = Message.FindAttachment(MetadataHash);
+ if (Attachment == nullptr)
+ {
+ throw std::runtime_error(fmt::format("Blob metadata attachment {} is missing", MetadataHash));
+ }
+ BlobHashes.push_back(BlobHash);
+ if (Attachment->IsObject())
+ {
+ MetadataPayloads.push_back(Attachment->AsObject().GetBuffer().MakeOwned().AsIoBuffer());
+ MetadataPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ else if (Attachment->IsCompressedBinary())
+ {
+ MetadataPayloads.push_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
+ MetadataPayloads.back().SetContentType(ZenContentType::kCompressedBinary);
+ }
+ else
+ {
+ ZEN_ASSERT(Attachment->IsBinary());
+ MetadataPayloads.push_back(Attachment->AsBinary().AsIoBuffer());
+ MetadataPayloads.back().SetContentType(ZenContentType::kBinary);
+ }
+
+ BlobsArrayIt++;
+ MetadataArrayIt++;
+ }
+ m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads);
+ return ServerRequest.WriteResponse(HttpResponseCode::OK);
+}
+
+void
+HttpBuildStoreService::GetMetadatasRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("HttpBuildStoreService::GetMetadatasRequest");
+ HttpServerRequest& ServerRequest = Req.ServerRequest();
+ std::string_view Namespace = Req.GetCapture(1);
+ std::string_view Bucket = Req.GetCapture(2);
+ std::string_view BuildId = Req.GetCapture(3);
+ ZEN_UNUSED(Namespace, Bucket, BuildId);
+ IoBuffer RequestPayload = ServerRequest.ReadPayload();
+ if (!RequestPayload)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Expected compact binary body for metadata request, body is missing");
+ }
+ if (RequestPayload.GetContentType() != HttpContentType::kCbObject)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Expected compact binary body for metadata request, got {}", ToString(RequestPayload.GetContentType())));
+ }
+ if (CbValidateError ValidateError = ValidateCompactBinary(RequestPayload.GetView(), CbValidateMode::Default);
+ ValidateError != CbValidateError::None)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Compact binary body for metadata request is not valid, reason: {}", ToString(ValidateError)));
+ }
+ CbObject RequestObject = LoadCompactBinaryObject(RequestPayload);
+ CbArrayView BlobsArray = RequestObject["blobHashes"sv].AsArrayView();
+ if (!BlobsArray)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Compact binary body for metadata request is missing 'blobHashes' array");
+ }
+ const uint64_t BlobCount = BlobsArray.Num();
+
+ std::vector<IoHash> BlobRawHashes;
+ BlobRawHashes.reserve(BlobCount);
+ for (CbFieldView BlockHashView : BlobsArray)
+ {
+ BlobRawHashes.push_back(BlockHashView.AsHash());
+ if (BlobRawHashes.back() == IoHash::Zero)
+ {
+ const uint8_t Type = (uint8_t)BlockHashView.GetValue().GetType();
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Compact binary body for metadata 'blobHashes' array contains invalid field type: {}", Type));
+ }
+ }
+ m_BuildStoreStats.BlobMetaReadCount += BlobRawHashes.size();
+ std::vector<IoBuffer> BlockMetadatas = m_BuildStore.GetMetadatas(BlobRawHashes, &GetSmallWorkerPool(EWorkloadType::Burst));
+
+ CbPackage ResponsePackage;
+ std::vector<CbAttachment> Attachments;
+ tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes;
+ Attachments.reserve(BlobCount);
+ AttachmentHashes.reserve(BlobCount);
+ {
+ CbObjectWriter ResponseWriter;
+
+ ResponseWriter.BeginArray("blobHashes");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
+ {
+ if (BlockMetadatas[BlockHashIndex])
+ {
+ const IoHash& BlockHash = BlobRawHashes[BlockHashIndex];
+ ResponseWriter.AddHash(BlockHash);
+ }
+ }
+ ResponseWriter.EndArray(); // blobHashes
+
+ ResponseWriter.BeginArray("metadatas");
+
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
+ {
+ if (IoBuffer Metadata = BlockMetadatas[BlockHashIndex]; Metadata)
+ {
+ switch (Metadata.GetContentType())
+ {
+ case ZenContentType::kCbObject:
+ {
+ CbObject Object = CbObject(SharedBuffer(std::move(Metadata)).MakeOwned());
+ const IoHash ObjectHash = Object.GetHash();
+ ResponseWriter.AddBinaryAttachment(ObjectHash);
+ if (!AttachmentHashes.contains(ObjectHash))
+ {
+ Attachments.push_back(CbAttachment(Object, ObjectHash));
+ AttachmentHashes.insert(ObjectHash);
+ }
+ }
+ break;
+ case ZenContentType::kCompressedBinary:
+ {
+ IoHash RawHash;
+ uint64_t _;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Metadata)), RawHash, _);
+ ResponseWriter.AddBinaryAttachment(RawHash);
+ if (!AttachmentHashes.contains(RawHash))
+ {
+ Attachments.push_back(CbAttachment(Compressed, RawHash));
+ AttachmentHashes.insert(RawHash);
+ }
+ }
+ break;
+ default:
+ {
+ const IoHash RawHash = IoHash::HashBuffer(Metadata);
+ ResponseWriter.AddBinaryAttachment(RawHash);
+ if (!AttachmentHashes.contains(RawHash))
+ {
+ Attachments.push_back(CbAttachment(SharedBuffer(Metadata), RawHash));
+ AttachmentHashes.insert(RawHash);
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ ResponseWriter.EndArray(); // metadatas
+
+ ResponsePackage.SetObject(ResponseWriter.Save());
+ }
+ ResponsePackage.AddAttachments(Attachments);
+
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage);
+ ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+}
+
+void
+HttpBuildStoreService::BlobsExistsRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("HttpBuildStoreService::BlobsExistsRequest");
+ HttpServerRequest& ServerRequest = Req.ServerRequest();
+ std::string_view Namespace = Req.GetCapture(1);
+ std::string_view Bucket = Req.GetCapture(2);
+ std::string_view BuildId = Req.GetCapture(3);
+ ZEN_UNUSED(Namespace, Bucket, BuildId);
+ IoBuffer RequestPayload = ServerRequest.ReadPayload();
+ if (!RequestPayload)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Expected compact binary body for blob exists request, body is missing");
+ }
+ if (RequestPayload.GetContentType() != HttpContentType::kCbObject)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Expected compact binary body for blob exists request, got {}", ToString(RequestPayload.GetContentType())));
+ }
+ if (CbValidateError ValidateError = ValidateCompactBinary(RequestPayload.GetView(), CbValidateMode::Default);
+ ValidateError != CbValidateError::None)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Compact binary body for blob exists request is not valid, reason: {}", ToString(ValidateError)));
+ }
+ CbObject RequestObject = LoadCompactBinaryObject(RequestPayload);
+ CbArrayView BlobsArray = RequestObject["blobHashes"sv].AsArrayView();
+ if (!BlobsArray)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Compact binary body for blob exists request is missing 'blobHashes' array");
+ }
+
+ std::vector<IoHash> BlobRawHashes;
+ BlobRawHashes.reserve(BlobsArray.Num());
+ for (CbFieldView BlockHashView : BlobsArray)
+ {
+ BlobRawHashes.push_back(BlockHashView.AsHash());
+ if (BlobRawHashes.back() == IoHash::Zero)
+ {
+ const uint8_t Type = (uint8_t)BlockHashView.GetValue().GetType();
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Compact binary body for blob exists request 'blobHashes' array contains invalid field type: {}", Type));
+ }
+ }
+
+ m_BuildStoreStats.BlobExistsCount += BlobRawHashes.size();
+ std::vector<BuildStore::BlobExistsResult> BlobsExists = m_BuildStore.BlobsExists(BlobRawHashes);
+ CbObjectWriter ResponseWriter(9 * BlobsExists.size());
+ ResponseWriter.BeginArray("blobExists"sv);
+ for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists)
+ {
+ ResponseWriter.AddBool(BlobExists.HasBody);
+ if (BlobExists.HasBody)
+ {
+ m_BuildStoreStats.BlobExistsBodyHitCount++;
+ }
+ }
+ ResponseWriter.EndArray(); // blobExist
+ ResponseWriter.BeginArray("metadataExists"sv);
+ for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists)
+ {
+ ResponseWriter.AddBool(BlobExists.HasBody);
+ if (BlobExists.HasMetadata)
+ {
+ m_BuildStoreStats.BlobExistsMetaHitCount++;
+ }
+ }
+ ResponseWriter.EndArray(); // metadataExists
+ CbObject ResponseObject = ResponseWriter.Save();
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, ResponseObject);
+}
+
+void
+HttpBuildStoreService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("HttpBuildStoreService::Stats");
+ CbObjectWriter Cbo;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+
+ Cbo.BeginObject("builds");
+ {
+ Cbo.BeginObject("blobs");
+ {
+ Cbo << "readcount" << m_BuildStoreStats.BlobReadCount << "writecount" << m_BuildStoreStats.BlobWriteCount << "hitcount"
+ << m_BuildStoreStats.BlobHitCount;
+ }
+ Cbo.EndObject();
+
+ Cbo.BeginObject("metadata");
+ {
+ Cbo << "readcount" << m_BuildStoreStats.BlobMetaReadCount << "writecount" << m_BuildStoreStats.BlobMetaWriteCount << "hitcount"
+ << m_BuildStoreStats.BlobMetaHitCount;
+ }
+ Cbo.EndObject();
+
+ Cbo << "requestcount" << m_BuildStoreStats.RequestCount;
+ Cbo << "badrequestcount" << m_BuildStoreStats.BadRequestCount;
+ }
+ Cbo.EndObject();
+
+ return Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+} // namespace zen
diff --git a/src/zenserver/buildstore/httpbuildstore.h b/src/zenserver/buildstore/httpbuildstore.h
new file mode 100644
index 000000000..a59aa882a
--- /dev/null
+++ b/src/zenserver/buildstore/httpbuildstore.h
@@ -0,0 +1,65 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/stats.h>
+#include <zenhttp/httpserver.h>
+#include <zenhttp/httpstats.h>
+
+#include <filesystem>
+
+namespace zen {
+
+class BuildStore;
+
+class HttpBuildStoreService final : public zen::HttpService, public IHttpStatsProvider
+{
+public:
+ HttpBuildStoreService(HttpStatsService& StatsService, BuildStore& Store);
+ virtual ~HttpBuildStoreService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+
+private:
+ struct BuildStoreStats
+ {
+ std::atomic_uint64_t BlobReadCount{};
+ std::atomic_uint64_t BlobHitCount{};
+ std::atomic_uint64_t BlobWriteCount{};
+ std::atomic_uint64_t BlobMetaReadCount{};
+ std::atomic_uint64_t BlobMetaHitCount{};
+ std::atomic_uint64_t BlobMetaWriteCount{};
+ std::atomic_uint64_t BlobExistsCount{};
+ std::atomic_uint64_t BlobExistsBodyHitCount{};
+ std::atomic_uint64_t BlobExistsMetaHitCount{};
+ std::atomic_uint64_t RequestCount{};
+ std::atomic_uint64_t BadRequestCount{};
+ };
+
+ void Initialize();
+
+ inline LoggerRef Log() { return m_Log; }
+
+ LoggerRef m_Log;
+
+ void PutBlobRequest(HttpRouterRequest& Req);
+ void GetBlobRequest(HttpRouterRequest& Req);
+
+ void PutMetadataRequest(HttpRouterRequest& Req);
+ void GetMetadatasRequest(HttpRouterRequest& Req);
+
+ void BlobsExistsRequest(HttpRouterRequest& Req);
+
+ HttpRequestRouter m_Router;
+
+ HttpStatsService& m_StatsService;
+
+ BuildStore& m_BuildStore;
+ BuildStoreStats m_BuildStoreStats;
+ metrics::OperationTiming m_HttpRequests;
+};
+
+} // namespace zen
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 809092378..0da98210c 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -377,6 +377,9 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("server.objectstore.enabled"sv, ServerOptions.ObjectStoreEnabled, "objectstore-enabled"sv);
LuaOptions.AddOption("server.objectstore.buckets"sv, ServerOptions.ObjectStoreConfig);
+ ////// buildsstore
+ LuaOptions.AddOption("server.buildstore.enabled"sv, ServerOptions.BuildStoreConfig.Enabled, "buildstore-enabled"sv);
+
////// network
LuaOptions.AddOption("network.httpserverclass"sv, ServerOptions.HttpServerConfig.ServerClass, "http"sv);
LuaOptions.AddOption("network.httpserverthreads"sv, ServerOptions.HttpServerConfig.ThreadCount, "http-threads"sv);
@@ -1031,6 +1034,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<std::vector<std::string>>(BucketConfigs),
"");
+ options.add_option("buildstore",
+ "",
+ "buildstore-enabled",
+ "Whether the builds store is enabled or not.",
+ cxxopts::value<bool>(ServerOptions.BuildStoreConfig.Enabled)->default_value("false"),
+ "");
+
options.add_option("stats",
"",
"statsd",
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index c7781aada..a87b6f8b3 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -59,11 +59,17 @@ struct ZenProjectStoreEvictionPolicy
int32_t MaxDurationSeconds = 7 * 24 * 60 * 60;
};
+struct ZenBuildStoreEvictionPolicy
+{
+ int32_t MaxDurationSeconds = 3 * 24 * 60 * 60;
+};
+
struct ZenGcConfig
{
// ZenCasEvictionPolicy Cas;
ZenCacheEvictionPolicy Cache;
ZenProjectStoreEvictionPolicy ProjectStore;
+ ZenBuildStoreEvictionPolicy BuildStore;
int32_t MonitorIntervalSeconds = 30;
int32_t IntervalSeconds = 0;
bool CollectSmallObjects = true;
@@ -130,6 +136,11 @@ struct ZenProjectStoreConfig
bool StoreProjectAttachmentMetaData = false;
};
+struct ZenBuildStoreConfig
+{
+ bool Enabled = false;
+};
+
struct ZenWorkspacesConfig
{
bool Enabled = false;
@@ -145,6 +156,7 @@ struct ZenServerOptions
zen::HttpServerConfig HttpServerConfig;
ZenStructuredCacheConfig StructuredCacheConfig;
ZenProjectStoreConfig ProjectStoreConfig;
+ ZenBuildStoreConfig BuildStoreConfig;
ZenStatsConfig StatsConfig;
ZenWorkspacesConfig WorksSpacesConfig;
std::filesystem::path SystemRootDir; // System root directory (used for machine level config)
diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp
index 8a4b977ad..0b7fd0400 100644
--- a/src/zenserver/workspaces/httpworkspaces.cpp
+++ b/src/zenserver/workspaces/httpworkspaces.cpp
@@ -84,7 +84,7 @@ HttpWorkspacesService::HttpWorkspacesService(HttpStatsService& StatsService, con
HttpWorkspacesService::~HttpWorkspacesService()
{
- m_StatsService.UnregisterHandler("prj", *this);
+ m_StatsService.UnregisterHandler("ws", *this);
}
const char*
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index f84bc0b00..03e269d49 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -23,6 +23,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
+#include <zenstore/buildstore/buildstore.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <zenstore/workspaces.h>
@@ -262,6 +263,13 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
*m_Workspaces));
}
+ if (ServerOptions.BuildStoreConfig.Enabled)
+ {
+ BuildStoreConfig ObjCfg;
+ ObjCfg.RootDirectory = m_DataRoot / "builds";
+ m_BuildStore = std::make_unique<BuildStore>(std::move(ObjCfg), m_GcManager);
+ }
+
if (ServerOptions.StructuredCacheConfig.Enabled)
{
InitializeStructuredCache(ServerOptions);
@@ -310,6 +318,12 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_Http->RegisterService(*m_ObjStoreService);
}
+ if (ServerOptions.BuildStoreConfig.Enabled)
+ {
+ m_BuildStoreService = std::make_unique<HttpBuildStoreService>(m_StatsService, *m_BuildStore);
+ m_Http->RegisterService(*m_BuildStoreService);
+ }
+
#if ZEN_WITH_VFS
m_VfsService = std::make_unique<VfsService>();
m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore));
@@ -327,6 +341,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
.Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
.MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
.MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds),
+ .MaxBuildStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.BuildStore.MaxDurationSeconds),
.CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
.Enabled = ServerOptions.GcConfig.Enabled,
.DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
@@ -347,6 +362,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_CacheStore.Get(),
m_CidStore.get(),
m_ProjectStore,
+ m_BuildStore.get(),
HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile,
.HttpLogPath = ServerOptions.DataDir / "logs" / "http.log",
.CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"},
@@ -801,6 +817,9 @@ ZenServer::Cleanup()
m_ObjStoreService.reset();
m_FrontendService.reset();
+ m_BuildStoreService.reset();
+ m_BuildStore = {};
+
m_StructuredCacheService.reset();
m_UpstreamService.reset();
m_UpstreamCache.reset();
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index 80054dc35..5cfa04ba1 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -25,6 +25,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/gc.h>
#include "admin/admin.h"
+#include "buildstore/httpbuildstore.h"
#include "cache/httpstructuredcache.h"
#include "diag/diagsvcs.h"
#include "frontend/frontend.h"
@@ -127,6 +128,8 @@ private:
Ref<ZenCacheStore> m_CacheStore;
std::unique_ptr<OpenProcessCache> m_OpenProcessCache;
HttpTestService m_TestService;
+ std::unique_ptr<BuildStore> m_BuildStore;
+
#if ZEN_WITH_TESTS
HttpTestingService m_TestingService;
#endif
@@ -140,6 +143,7 @@ private:
HttpHealthService m_HealthService;
std::unique_ptr<HttpFrontendService> m_FrontendService;
std::unique_ptr<HttpObjectStoreService> m_ObjStoreService;
+ std::unique_ptr<HttpBuildStoreService> m_BuildStoreService;
std::unique_ptr<VfsService> m_VfsService;
std::unique_ptr<JobQueue> m_JobQueue;
std::unique_ptr<HttpAdminService> m_AdminService;
diff --git a/src/zenstore-test/zenstore-test.cpp b/src/zenstore-test/zenstore-test.cpp
index e5b312984..c56971520 100644
--- a/src/zenstore-test/zenstore-test.cpp
+++ b/src/zenstore-test/zenstore-test.cpp
@@ -2,6 +2,7 @@
#include <zencore/filesystem.h>
#include <zencore/logging.h>
+#include <zenstore/buildstore/buildstore.h>
#include <zenstore/zenstore.h>
#include <zencore/memory/newdelete.h>
@@ -18,6 +19,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
zen::zenstore_forcelinktests();
zen::logging::InitializeLogging();
+ zen::buildstore_forcelink();
zen::MaximizeOpenFileCount();
return ZEN_RUN_TESTS(argc, argv);
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index e976c061d..63c0388fa 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -578,7 +578,7 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons
}
void
-BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback)
+BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback)
{
ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::WriteChunks");
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
new file mode 100644
index 000000000..8674aab75
--- /dev/null
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -0,0 +1,1475 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/buildstore/buildstore.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory/llm.h>
+#include <zencore/scopeguard.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+
+#include <zencore/uid.h>
+#include <zencore/xxhash.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compress.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+const FLLMTag&
+GetBuildstoreTag()
+{
+ static FLLMTag _("store", FLLMTag("builds"));
+
+ return _;
+}
+
+using namespace std::literals;
+
+namespace blobstore::impl {
+
+ const std::string BaseName = "builds";
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
+
+ std::filesystem::path GetBlobIndexPath(const std::filesystem::path& RootDirectory)
+ {
+ return RootDirectory / (BaseName + IndexExtension);
+ }
+
+ std::filesystem::path GetBlobLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + LogExtension); }
+
+ std::filesystem::path GetMetaIndexPath(const std::filesystem::path& RootDirectory)
+ {
+ return RootDirectory / (BaseName + "_meta" + IndexExtension);
+ }
+
+ std::filesystem::path GetMetaLogPath(const std::filesystem::path& RootDirectory)
+ {
+ return RootDirectory / (BaseName + "_meta" + LogExtension);
+ }
+} // namespace blobstore::impl
+
+BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc)
+: m_Config(Config)
+, m_Gc(Gc)
+, m_LargeBlobStore(m_Gc)
+, m_SmallBlobStore(Gc)
+, m_MetadataBlockStore()
+{
+ ZEN_TRACE_CPU("BuildStore::BuildStore");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ try
+ {
+ std::filesystem::path BlobLogPath = blobstore::impl::GetBlobLogPath(Config.RootDirectory);
+ std::filesystem::path MetaLogPath = blobstore::impl::GetMetaLogPath(Config.RootDirectory);
+ bool IsNew = !(std::filesystem::exists(BlobLogPath) && std::filesystem::exists(MetaLogPath));
+
+ if (!IsNew)
+ {
+ m_BlobLogFlushPosition = ReadPayloadLog(RwLock::ExclusiveLockScope(m_Lock), BlobLogPath, 0);
+ m_MetaLogFlushPosition = ReadMetadataLog(RwLock::ExclusiveLockScope(m_Lock), MetaLogPath, 0);
+ }
+ m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew);
+ m_SmallBlobStore.Initialize(Config.RootDirectory,
+ "blob_cas",
+ m_Config.SmallBlobBlockStoreMaxBlockSize,
+ m_Config.SmallBlobBlockStoreAlignement,
+ IsNew);
+ m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20);
+ {
+ BlockStore::BlockIndexSet KnownBlocks;
+ for (const BlobEntry& Blob : m_BlobEntries)
+ {
+ if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex)
+ {
+ const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex];
+ KnownBlocks.insert(Metadata.Location.BlockIndex);
+ }
+ }
+ m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ }
+
+ m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite);
+ m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite);
+
+ m_Gc.AddGcReferencer(*this);
+ m_Gc.AddGcReferenceLocker(*this);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what());
+ m_Gc.RemoveGcReferenceLocker(*this);
+ m_Gc.RemoveGcReferencer(*this);
+ }
+}
+
+BuildStore::~BuildStore()
+{
+ try
+ {
+ ZEN_TRACE_CPU("BuildStore::~BuildStore");
+ m_Gc.RemoveGcReferenceLocker(*this);
+ m_Gc.RemoveGcReferencer(*this);
+ Flush();
+ m_MetadatalogFile.Close();
+ m_PayloadlogFile.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~BuildStore() threw exception: {}", Ex.what());
+ }
+}
+
+void
+BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
+{
+ ZEN_TRACE_CPU("BuildStore::PutBlob");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex BlobIndex = It->second;
+ if (m_BlobEntries[BlobIndex].Payload)
+ {
+ return;
+ }
+ }
+ }
+
+ PayloadEntry Entry;
+ if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize)
+ {
+ CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash);
+ ZEN_UNUSED(Result);
+ Entry = {.Flags = PayloadEntry::kStandalone};
+ }
+ else
+ {
+ CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash);
+ ZEN_UNUSED(Result);
+ Entry = {.Flags = 0};
+ }
+ m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ if (Blob.Payload)
+ {
+ m_PayloadEntries[Blob.Payload] = Entry;
+ }
+ else
+ {
+ Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Entry);
+ }
+ Blob.LastAccessTime = GcClock::TickCount();
+ }
+ else
+ {
+ PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Entry);
+
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ // we only remove during GC and compact this then...
+ m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
+ m_BlobLookup.insert({BlobHash, NewBlobIndex});
+ }
+}
+
+IoBuffer
+BuildStore::GetBlob(const IoHash& BlobHash)
+{
+ ZEN_TRACE_CPU("BuildStore::GetBlob");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ RwLock::SharedLockScope Lock(m_Lock);
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ Blob.LastAccessTime = GcClock::TickCount();
+ if (Blob.Payload)
+ {
+ const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload];
+ const bool IsStandalone = (Entry.Flags & PayloadEntry::kStandalone) != 0;
+ Lock.ReleaseNow();
+
+ IoBuffer Chunk;
+ if (IsStandalone)
+ {
+ ZEN_TRACE_CPU("GetLarge");
+ Chunk = m_LargeBlobStore.FindChunk(BlobHash);
+ }
+ else
+ {
+ ZEN_TRACE_CPU("GetSmall");
+ Chunk = m_SmallBlobStore.FindChunk(BlobHash);
+ }
+ if (Chunk)
+ {
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+ return Chunk;
+ }
+ else
+ {
+ ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block");
+ }
+ }
+ }
+ return {};
+}
+
+std::vector<BuildStore::BlobExistsResult>
+BuildStore::BlobsExists(std::span<const IoHash> BlobHashes)
+{
+ ZEN_TRACE_CPU("BuildStore::BlobsExists");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ std::vector<BuildStore::BlobExistsResult> Result;
+ Result.reserve(BlobHashes.size());
+ RwLock::SharedLockScope _(m_Lock);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ bool HasPayload = !!Blob.Payload;
+ bool HasMetadata = !!Blob.Metadata;
+ Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata});
+ }
+ else
+ {
+ Result.push_back({});
+ }
+ }
+ return Result;
+}
+
+void
+BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas)
+{
+ ZEN_TRACE_CPU("BuildStore::PutMetadatas");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ size_t WriteBlobIndex = 0;
+ m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++)
+ {
+ const IoBuffer& Data = MetaDatas[WriteBlobIndex];
+ const IoHash& BlobHash = BlobHashes[WriteBlobIndex];
+ const BlockStoreLocation& Location = Locations[LocationIndex];
+
+ MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0};
+ m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
+
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ if (Blob.Metadata)
+ {
+ m_MetadataEntries[Blob.Metadata] = Entry;
+ }
+ else
+ {
+ Blob.Metadata = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size()));
+ m_MetadataEntries.push_back(Entry);
+ }
+ Blob.LastAccessTime = GcClock::TickCount();
+ }
+ else
+ {
+ MetadataIndex NewMetadataIndex = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size()));
+ m_MetadataEntries.push_back(Entry);
+
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
+ m_BlobLookup.insert({BlobHash, NewBlobIndex});
+ }
+ WriteBlobIndex++;
+ if (m_TrackedCacheKeys)
+ {
+ m_TrackedCacheKeys->insert(BlobHash);
+ }
+ }
+ });
+}
+
+std::vector<IoBuffer>
+BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool)
+{
+ ZEN_TRACE_CPU("BuildStore::GetMetadatas");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ std::vector<BlockStoreLocation> MetaLocations;
+ std::vector<size_t> MetaLocationResultIndexes;
+ MetaLocations.reserve(BlobHashes.size());
+ MetaLocationResultIndexes.reserve(BlobHashes.size());
+ tsl::robin_set<uint32_t> ReferencedBlocks;
+
+ std::vector<IoBuffer> Result;
+ std::vector<ZenContentType> ResultContentTypes;
+ Result.resize(BlobHashes.size());
+ ResultContentTypes.resize(BlobHashes.size(), ZenContentType::kUnknownContentType);
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (size_t Index = 0; Index < BlobHashes.size(); Index++)
+ {
+ const IoHash& BlobHash = BlobHashes[Index];
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& ExistingBlobEntry = m_BlobEntries[ExistingBlobIndex];
+ if (ExistingBlobEntry.Metadata)
+ {
+ const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata];
+ MetaLocations.push_back(ExistingMetadataEntry.Location);
+ MetaLocationResultIndexes.push_back(Index);
+ ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex);
+ ResultContentTypes[Index] = ExistingMetadataEntry.ContentType;
+ }
+ ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::Tick());
+ }
+ }
+ }
+
+ auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
+ if (ChunkIndexes.size() < 4)
+ {
+ for (size_t ChunkIndex : ChunkIndexes)
+ {
+ IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]);
+ if (Chunk)
+ {
+ size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
+ Result[ResultIndex] = std::move(Chunk);
+ }
+ }
+ return true;
+ }
+ return m_MetadataBlockStore.IterateBlock(
+ MetaLocations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ if (Data != nullptr)
+ {
+ size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
+ Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size);
+ }
+ return true;
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
+ Result[ResultIndex] = File.GetChunk(Offset, Size);
+ return true;
+ },
+ 8u * 1024u);
+ };
+
+ if (!MetaLocations.empty())
+ {
+ Latch WorkLatch(1);
+
+ m_MetadataBlockStore.IterateChunks(MetaLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) -> bool {
+ ZEN_UNUSED(BlockIndex);
+ if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1)
+ {
+ return DoOneBlock(ChunkIndexes);
+ }
+ else
+ {
+ ZEN_ASSERT(OptionalWorkerPool != nullptr);
+ WorkLatch.AddCount(1);
+ try
+ {
+ OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ try
+ {
+ DoOneBlock(ChunkIndexes);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ WorkLatch.CountDown();
+ ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
+ }
+ return true;
+ }
+ });
+
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ }
+ for (size_t Index = 0; Index < Result.size(); Index++)
+ {
+ if (Result[Index])
+ {
+ Result[Index].SetContentType(ResultContentTypes[Index]);
+ }
+ }
+ return Result;
+}
+
+void
+BuildStore::Flush()
+{
+ ZEN_TRACE_CPU("BuildStore::Flush");
+ try
+ {
+ m_LargeBlobStore.Flush();
+ m_SmallBlobStore.Flush();
+ m_MetadataBlockStore.Flush(false);
+
+ m_PayloadlogFile.Flush();
+ m_MetadatalogFile.Flush();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("BuildStore::Flush failed. Reason: {}", Ex.what());
+ }
+}
+
+void
+BuildStore::CompactState()
+{
+ ZEN_TRACE_CPU("BuildStore::CompactState");
+
+ std::vector<BlobEntry> BlobEntries;
+ std::vector<PayloadEntry> PayloadEntries;
+ std::vector<MetadataEntry> MetadataEntries;
+
+ tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> BlobLookup;
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+ const size_t EntryCount = m_BlobLookup.size();
+ BlobLookup.reserve(EntryCount);
+ const size_t PayloadCount = m_PayloadEntries.size();
+ PayloadEntries.reserve(PayloadCount);
+ const size_t MetadataCount = m_MetadataEntries.size();
+ MetadataEntries.reserve(MetadataCount);
+
+ for (auto LookupIt : m_BlobLookup)
+ {
+ const IoHash& BlobHash = LookupIt.first;
+ const BlobIndex ReadBlobIndex = LookupIt.second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+
+ const BlobIndex WriteBlobIndex(gsl::narrow<uint32_t>(BlobEntries.size()));
+ BlobEntries.push_back(ReadBlobEntry);
+ BlobEntry& WriteBlobEntry = BlobEntries.back();
+
+ if (WriteBlobEntry.Payload)
+ {
+ const PayloadEntry& ReadPayloadEntry = m_PayloadEntries[ReadBlobEntry.Payload];
+ WriteBlobEntry.Payload = PayloadIndex(gsl::narrow<uint32_t>(PayloadEntries.size()));
+ PayloadEntries.push_back(ReadPayloadEntry);
+ }
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& ReadMetadataEntry = m_MetadataEntries[ReadBlobEntry.Metadata];
+ WriteBlobEntry.Metadata = MetadataIndex(gsl::narrow<uint32_t>(MetadataEntries.size()));
+ MetadataEntries.push_back(ReadMetadataEntry);
+ }
+
+ BlobLookup.insert({BlobHash, WriteBlobIndex});
+ }
+ m_BlobEntries.swap(BlobEntries);
+ m_PayloadEntries.swap(PayloadEntries);
+ m_MetadataEntries.swap(MetadataEntries);
+ m_BlobLookup.swap(BlobLookup);
+}
+
+uint64_t
+BuildStore::ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
+{
+ ZEN_TRACE_CPU("BuildStore::ReadPayloadLog");
+ if (!std::filesystem::is_regular_file(LogPath))
+ {
+ return 0;
+ }
+
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read build store '{}' payload log containing {} entries in {}",
+ LogPath,
+ LogEntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<PayloadDiskEntry> CasLog;
+ if (!CasLog.IsValid(LogPath))
+ {
+ std::filesystem::remove(LogPath);
+ return 0;
+ }
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (!CasLog.Initialize())
+ {
+ return 0;
+ }
+
+ const uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full payload log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+
+ LogEntryCount = EntryCount - SkipEntryCount;
+ uint64_t InvalidEntryCount = 0;
+
+ CasLog.Replay(
+ [&](const PayloadDiskEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Entry.Flags & PayloadEntry::kTombStone)
+ {
+ // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
+ m_BlobLookup.erase(Record.BlobHash);
+ return;
+ }
+
+ if (!ValidatePayloadDiskEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid payload entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex];
+ if (ExistingBlob.Payload)
+ {
+ const PayloadIndex ExistingPayloadIndex = ExistingBlob.Payload;
+ m_PayloadEntries[ExistingPayloadIndex] = Record.Entry;
+ }
+ else
+ {
+ const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Record.Entry);
+ ExistingBlob.Payload = NewPayloadIndex;
+ }
+ }
+ else
+ {
+ const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Record.Entry);
+
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::Tick())});
+ m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex);
+ }
+ },
+ SkipEntryCount);
+
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid payload entries in '{}'", InvalidEntryCount, LogPath);
+ }
+
+ return LogEntryCount;
+}
+
+uint64_t
+BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
+{
+ ZEN_TRACE_CPU("BuildStore::ReadMetadataLog");
+ if (!std::filesystem::is_regular_file(LogPath))
+ {
+ return 0;
+ }
+
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read build store '{}' metadata log containing {} entries in {}",
+ LogPath,
+ LogEntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<MetadataDiskEntry> CasLog;
+ if (!CasLog.IsValid(LogPath))
+ {
+ std::filesystem::remove(LogPath);
+ return 0;
+ }
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (!CasLog.Initialize())
+ {
+ return 0;
+ }
+
+ const uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full metadata log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+
+ LogEntryCount = EntryCount - SkipEntryCount;
+ uint64_t InvalidEntryCount = 0;
+
+ CasLog.Replay(
+ [&](const MetadataDiskEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Entry.Flags & MetadataEntry::kTombStone)
+ {
+ // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
+ m_BlobLookup.erase(Record.BlobHash);
+ return;
+ }
+
+ if (!ValidateMetadataDiskEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid metadata entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex];
+ if (ExistingBlob.Metadata)
+ {
+ const MetadataIndex ExistingMetadataIndex = ExistingBlob.Metadata;
+ m_MetadataEntries[ExistingMetadataIndex] = Record.Entry;
+ }
+ else
+ {
+ const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size()));
+ m_MetadataEntries.push_back(Record.Entry);
+ ExistingBlob.Metadata = NewMetadataIndex;
+ }
+ }
+ else
+ {
+ const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size()));
+ m_MetadataEntries.push_back(Record.Entry);
+
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::Tick())});
+ m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex);
+ }
+ },
+ SkipEntryCount);
+
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid metadata entries in '{}'", InvalidEntryCount, LogPath);
+ }
+
+ return LogEntryCount;
+}
+
+bool
+BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason)
+{
+ if (Entry.BlobHash == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Entry.Flags & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.Flags, Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Entry.Flags & PayloadEntry::kTombStone)
+ {
+ return true;
+ }
+ if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0 || Entry.Entry.Reserved3 != 0)
+ {
+ OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ return true;
+}
+
+bool
+BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason)
+{
+ if (Entry.BlobHash == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Entry.Location.Size == 0)
+ {
+ OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size);
+ return false;
+ }
+ if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0)
+ {
+ OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Entry.Flags & MetadataEntry::kTombStone)
+ {
+ return true;
+ }
+ if (Entry.Entry.ContentType == ZenContentType::kCOUNT)
+ {
+ OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0)
+ {
+ OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ return true;
+}
+
+class BuildStoreGcReferenceChecker : public GcReferenceChecker
+{
+public:
+ BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {}
+ virtual std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string());
+ }
+
+ virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); }
+
+ virtual void UpdateLockedState(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Builds::UpdateLockedState");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ m_References.reserve(m_Store.m_BlobLookup.size());
+ for (const auto& It : m_Store.m_BlobLookup)
+ {
+ const BuildStore::BlobIndex ExistingBlobIndex = It.second;
+ if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload)
+ {
+ m_References.push_back(It.first);
+ }
+ }
+ FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References);
+ }
+
+ virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
+ {
+ ZEN_UNUSED(Ctx);
+ ZEN_TRACE_CPU("Builds::GetUnusedReferences");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ size_t InitialCount = IoCids.size();
+ size_t UsedCount = InitialCount;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: buildstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
+ "buildstore",
+ UsedCount,
+ InitialCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids);
+ UsedCount = IoCids.size() - UnusedReferences.size();
+ return UnusedReferences;
+ }
+
+private:
+ BuildStore& m_Store;
+ std::vector<IoHash> m_References;
+};
+
+std::string
+BuildStore::GetGcName(GcCtx& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string());
+}
+
+class BuildStoreGcCompator : public GcStoreCompactor
+{
+ using BlobEntry = BuildStore::BlobEntry;
+ using PayloadEntry = BuildStore::PayloadEntry;
+ using MetadataEntry = BuildStore::MetadataEntry;
+ using MetadataDiskEntry = BuildStore::MetadataDiskEntry;
+ using BlobIndex = BuildStore::BlobIndex;
+ using PayloadIndex = BuildStore::PayloadIndex;
+ using MetadataIndex = BuildStore::MetadataIndex;
+
+public:
+ BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {}
+
+ virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
+ {
+ ZEN_UNUSED(ClaimDiskReserveCallback);
+ ZEN_TRACE_CPU("Builds::CompactStore");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}",
+ m_Store.m_Config.RootDirectory,
+ NiceBytes(Stats.RemovedDisk),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ if (!m_RemovedBlobs.empty())
+ {
+ if (Ctx.Settings.CollectSmallObjects)
+ {
+ m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); });
+ auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); });
+
+ BlockStore::BlockUsageMap BlockUsage;
+ {
+ RwLock::SharedLockScope __(m_Store.m_Lock);
+
+ for (auto LookupIt : m_Store.m_BlobLookup)
+ {
+ const BlobIndex ReadBlobIndex = LookupIt.second;
+ const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex];
+
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata];
+
+ uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex;
+ uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement);
+
+ if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end())
+ {
+ BlockStore::BlockUsageInfo& Info = BlockUsageIt.value();
+ Info.EntryCount++;
+ Info.DiskUsage += ChunkSize;
+ }
+ else
+ {
+ BlockUsage.insert_or_assign(BlockIndex,
+ BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
+ }
+ }
+ }
+ }
+
+ BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90);
+ BlockStoreCompactState BlockCompactState;
+ std::vector<IoHash> BlockCompactStateKeys;
+ BlockCompactState.IncludeBlocks(BlocksToCompact);
+
+ if (BlocksToCompact.size() > 0)
+ {
+ {
+ RwLock::SharedLockScope ___(m_Store.m_Lock);
+ for (const auto& Entry : m_Store.m_BlobLookup)
+ {
+ BlobIndex Index = Entry.second;
+
+ if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
+ {
+ if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location))
+ {
+ BlockCompactStateKeys.push_back(Entry.first);
+ }
+ }
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks",
+ m_Store.m_Config.RootDirectory,
+ BlocksToCompact.size());
+ }
+
+ m_Store.m_MetadataBlockStore.CompactBlocks(
+ BlockCompactState,
+ m_Store.m_Config.MetadataBlockStoreAlignement,
+ [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ std::vector<MetadataDiskEntry> MovedEntries;
+ MovedEntries.reserve(MovedArray.size());
+ RwLock::ExclusiveLockScope _(m_Store.m_Lock);
+ for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
+ {
+ size_t ChunkIndex = Moved.first;
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+
+ ZEN_ASSERT(m_Store.m_TrackedCacheKeys);
+ if (m_Store.m_TrackedCacheKeys->contains(Key))
+ {
+ continue;
+ }
+
+ if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end())
+ {
+ const BlobIndex Index = It->second;
+
+ if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
+ {
+ m_Store.m_MetadataEntries[Meta].Location = Moved.second;
+ MovedEntries.push_back(
+ MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key});
+ }
+ }
+ }
+ m_Store.m_MetadatalogFile.Append(MovedEntries);
+ Stats.RemovedDisk += FreedDiskSpace;
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+ return true;
+ },
+ ClaimDiskReserveCallback,
+ fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory));
+ }
+ else
+ {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks",
+ m_Store.m_Config.RootDirectory,
+ BlocksToCompact.size());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ virtual std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string());
+ }
+
+private:
+ BuildStore& m_Store;
+ const std::vector<IoHash> m_RemovedBlobs;
+};
+
+GcStoreCompactor*
+BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
+{
+ ZEN_TRACE_CPU("Builds::RemoveExpiredData");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: buildstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}",
+ m_Config.RootDirectory,
+ Stats.CheckedCount,
+ Stats.FoundCount,
+ Stats.DeletedCount,
+ NiceBytes(Stats.FreedMemory),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ });
+
+ const GcClock::Tick ExpireTicks = Ctx.Settings.BuildStoreExpireTime.time_since_epoch().count();
+
+ std::vector<IoHash> ExpiredBlobs;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ for (const auto& It : m_BlobLookup)
+ {
+ const BlobIndex ReadBlobIndex = It.second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+
+ const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime;
+ if (AccessTick < ExpireTicks)
+ {
+ ExpiredBlobs.push_back(It.first);
+ }
+ }
+ Stats.CheckedCount += m_BlobLookup.size();
+ Stats.FoundCount += ExpiredBlobs.size();
+ }
+
+ std::vector<IoHash> RemovedBlobs;
+ if (!ExpiredBlobs.empty())
+ {
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ RemovedBlobs.reserve(ExpiredBlobs.size());
+
+ std::vector<PayloadDiskEntry> RemovedPayloads;
+ std::vector<MetadataDiskEntry> RemoveMetadatas;
+
+ RwLock::ExclusiveLockScope __(m_Lock);
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+
+ for (const IoHash& ExpiredBlob : ExpiredBlobs)
+ {
+ if (auto It = m_BlobLookup.find(ExpiredBlob); It != m_BlobLookup.end())
+ {
+ const BlobIndex ReadBlobIndex = It->second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+
+ const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime;
+
+ if (AccessTick < ExpireTicks)
+ {
+ if (ReadBlobEntry.Payload)
+ {
+ RemovedPayloads.push_back(
+ PayloadDiskEntry{.Entry = m_PayloadEntries[ReadBlobEntry.Payload], .BlobHash = ExpiredBlob});
+ RemovedPayloads.back().Entry.Flags |= PayloadEntry::kTombStone;
+ m_PayloadEntries[ReadBlobEntry.Payload] = {};
+ m_BlobEntries[ReadBlobIndex].Payload = {};
+ }
+ if (ReadBlobEntry.Metadata)
+ {
+ RemoveMetadatas.push_back(
+ MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob});
+ RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
+ m_MetadataEntries[ReadBlobEntry.Metadata] = {};
+ m_BlobEntries[ReadBlobIndex].Metadata = {};
+ }
+
+ m_BlobLookup.erase(It);
+
+ RemovedBlobs.push_back(ExpiredBlob);
+ Stats.DeletedCount++;
+ }
+ }
+ }
+ if (!RemovedPayloads.empty())
+ {
+ m_PayloadlogFile.Append(RemovedPayloads);
+ }
+ if (!RemoveMetadatas.empty())
+ {
+ m_MetadatalogFile.Append(RemoveMetadatas);
+ }
+ }
+ }
+
+ if (!RemovedBlobs.empty())
+ {
+ CompactState();
+ }
+
+ return new BuildStoreGcCompator(*this, std::move(RemovedBlobs));
+}
+
+std::vector<GcReferenceChecker*>
+BuildStore::CreateReferenceCheckers(GcCtx& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ return {new BuildStoreGcReferenceChecker(*this)};
+}
+
+std::vector<GcReferenceValidator*>
+BuildStore::CreateReferenceValidators(GcCtx& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+ return {};
+}
+
+std::vector<RwLock::SharedLockScope>
+BuildStore::LockState(GcCtx& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_Lock));
+ return Locks;
+}
+
+/*
+ ___________ __
+ \__ ___/___ _______/ |_ ______
+ | |_/ __ \ / ___/\ __\/ ___/
+ | |\ ___/ \___ \ | | \___ \
+ |____| \___ >____ > |__| /____ >
+ \/ \/ \/
+*/
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("BuildStore.Blobs")
+{
+ ScopedTemporaryDirectory _;
+
+ BuildStoreConfig Config;
+ Config.RootDirectory = _.Path() / "build_store";
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ IoBuffer Payload = Store.GetBlob(RawHash);
+ CHECK(Payload);
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ IoBuffer Payload = Store.GetBlob(RawHash);
+ CHECK(Payload);
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ IoBuffer Payload = Store.GetBlob(RawHash);
+ CHECK(Payload);
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+ }
+}
+
+namespace blockstore::testing {
+ IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues)
+ {
+ CbObjectWriter Writer;
+ Writer.AddHash("rawHash"sv, BlobHash);
+ Writer.BeginObject("values");
+ {
+ for (const auto& V : KeyValues)
+ {
+ Writer.AddString(V.first, V.second);
+ }
+ }
+ Writer.EndObject(); // values
+ return Writer.Save().GetBuffer().AsIoBuffer();
+ };
+
+} // namespace blockstore::testing
+
+TEST_CASE("BuildStore.Metadata")
+{
+ using namespace blockstore::testing;
+
+ ScopedTemporaryDirectory _;
+
+ BuildStoreConfig Config;
+ Config.RootDirectory = _.Path() / "build_store";
+
+ std::vector<IoHash> BlobHashes;
+ std::vector<IoBuffer> MetaPayloads;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I)));
+ MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}}));
+ MetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(BlobHashes, MetaPayloads);
+
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr);
+ CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
+ for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
+ {
+ const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]);
+ CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash);
+ }
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr);
+ CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
+ for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
+ {
+ const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]);
+ CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash);
+ }
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ CHECK(!Store.GetBlob(BlobHash));
+ }
+ }
+ std::vector<IoHash> CompressedBlobsHashes;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ for (const auto& MetadataIt : MetadataPayloads)
+ {
+ CHECK(!MetadataIt);
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(Blob);
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash);
+ }
+ }
+
+ std::vector<IoBuffer> BlobMetaPayloads;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK_EQ(IoHash::HashBuffer(MetadataPayload), IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ }
+
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(Blob);
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash);
+ }
+
+ BlobMetaPayloads.clear();
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ BlobMetaPayloads.push_back(
+ MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}}));
+ BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(Blob);
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash);
+ }
+ }
+}
+
+TEST_CASE("BuildStore.GC")
+{
+ using namespace blockstore::testing;
+
+ ScopedTemporaryDirectory _;
+
+ BuildStoreConfig Config;
+ Config.RootDirectory = _.Path() / "build_store";
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ std::vector<IoBuffer> BlobMetaPayloads;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ {
+ GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = false,
+ .Verbose = true});
+ CHECK(!Result.WasCancelled);
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(Blob);
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash);
+ }
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ }
+ {
+ GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .Verbose = true});
+ CHECK(!Result.WasCancelled);
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(!Blob);
+ }
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK(!MetadataPayload);
+ }
+ }
+ }
+}
+
+void
+buildstore_forcelink()
+{
+}
+
+#endif
+
+} // namespace zen
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 2be0542db..b64bc26dd 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -226,7 +226,7 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
}
std::vector<CasStore::InsertResult>
-CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes)
+CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes)
{
ZEN_MEMSCOPE(GetCasContainerTag());
@@ -323,7 +323,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
}
bool
-CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
+CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit)
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index 07e620086..2eb4c233a 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -52,11 +52,11 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
~CasContainerStrategy();
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
- std::vector<CasStore::InsertResult> InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes);
+ std::vector<CasStore::InsertResult> InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(HashKeySet& InOutChunks);
- bool IterateChunks(std::span<IoHash> ChunkHashes,
+ bool IterateChunks(std::span<const IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit);
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 7ac10d613..fe5ae284b 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -1081,7 +1081,7 @@ GcManager::CollectGarbage(const GcSettings& Settings)
ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size());
{
ZEN_TRACE_CPU("GcV2::LockReferencers");
- // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until
+ // From this point we have blocked all writes to all References (DiskBucket/ProjectStore/BuildStore) until
// we delete the ReferenceLockers
Latch WorkLeft(1);
{
@@ -1108,7 +1108,7 @@ GcManager::CollectGarbage(const GcSettings& Settings)
ZEN_TRACE_CPU("GcV2::UpdateLockedState");
// Locking all references checkers so we have a steady state of which references are used
- // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until
+ // From this point we have blocked all writes to all References (DiskBucket/ProjectStore/BuildStore) until
// we delete the ReferenceCheckers
Latch WorkLeft(1);
@@ -1739,6 +1739,7 @@ GcScheduler::AppendGCLog(std::string_view Id, GcClock::TimePoint StartTime, cons
{
Writer << "CacheExpireTime"sv << ToDateTime(Settings.CacheExpireTime);
Writer << "ProjectStoreExpireTime"sv << ToDateTime(Settings.ProjectStoreExpireTime);
+ Writer << "BuildStoreExpireTime"sv << ToDateTime(Settings.BuildStoreExpireTime);
Writer << "CollectSmallObjects"sv << Settings.CollectSmallObjects;
Writer << "IsDeleteMode"sv << Settings.IsDeleteMode;
Writer << "SkipCidDelete"sv << Settings.SkipCidDelete;
@@ -1940,6 +1941,7 @@ GcScheduler::SchedulerThread()
std::chrono::seconds LightweightGcInterval = m_Config.LightweightInterval;
std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration;
std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration;
+ std::chrono::seconds MaxBuildStoreDuration = m_Config.MaxBuildStoreDuration;
uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit;
bool SkipCid = false;
GcVersion UseGCVersion = m_Config.UseGCVersion;
@@ -1975,6 +1977,10 @@ GcScheduler::SchedulerThread()
{
MaxProjectStoreDuration = TriggerParams.MaxProjectStoreDuration;
}
+ if (TriggerParams.MaxBuildStoreDuration != std::chrono::seconds::max())
+ {
+ MaxBuildStoreDuration = TriggerParams.MaxBuildStoreDuration;
+ }
if (TriggerParams.DiskSizeSoftLimit != 0)
{
DiskSizeSoftLimit = TriggerParams.DiskSizeSoftLimit;
@@ -2046,6 +2052,8 @@ GcScheduler::SchedulerThread()
MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration;
GcClock::TimePoint ProjectStoreExpireTime =
MaxProjectStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxProjectStoreDuration;
+ GcClock::TimePoint BuildStoreExpireTime =
+ MaxBuildStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxBuildStoreDuration;
const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
@@ -2102,6 +2110,10 @@ GcScheduler::SchedulerThread()
{
ProjectStoreExpireTime = SizeBasedExpireTime;
}
+ if (SizeBasedExpireTime > BuildStoreExpireTime)
+ {
+ BuildStoreExpireTime = SizeBasedExpireTime;
+ }
}
std::chrono::seconds RemainingTimeUntilGc =
@@ -2227,6 +2239,7 @@ GcScheduler::SchedulerThread()
bool GcSuccess = CollectGarbage(CacheExpireTime,
ProjectStoreExpireTime,
+ BuildStoreExpireTime,
DoDelete,
CollectSmallObjects,
SkipCid,
@@ -2333,6 +2346,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time
bool
GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
const GcClock::TimePoint& ProjectStoreExpireTime,
+ const GcClock::TimePoint& BuildStoreExpireTime,
bool Delete,
bool CollectSmallObjects,
bool SkipCid,
@@ -2416,6 +2430,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
const GcSettings Settings = {.CacheExpireTime = CacheExpireTime,
.ProjectStoreExpireTime = ProjectStoreExpireTime,
+ .BuildStoreExpireTime = BuildStoreExpireTime,
.CollectSmallObjects = CollectSmallObjects,
.IsDeleteMode = Delete,
.SkipCidDelete = SkipCid,
@@ -2447,6 +2462,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
}
SB.Append(fmt::format(" Cache cutoff time: {}\n", Settings.CacheExpireTime));
SB.Append(fmt::format(" Project store cutoff time: {}\n", Settings.ProjectStoreExpireTime));
+ SB.Append(fmt::format(" Build store cutoff time: {}\n", Settings.BuildStoreExpireTime));
};
{
@@ -2552,6 +2568,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
if (Delete)
{
GcClock::TimePoint KeepRangeStart = Min(CacheExpireTime, ProjectStoreExpireTime);
+ KeepRangeStart = Min(KeepRangeStart, BuildStoreExpireTime);
m_LastGcExpireTime = KeepRangeStart;
std::unique_lock Lock(m_GcMutex);
m_DiskUsageWindow.KeepRange(KeepRangeStart.time_since_epoch().count(), GcClock::Duration::max().count());
diff --git a/src/zenstore/include/zenstore/accesstime.h b/src/zenstore/include/zenstore/accesstime.h
new file mode 100644
index 000000000..a28dc908b
--- /dev/null
+++ b/src/zenstore/include/zenstore/accesstime.h
@@ -0,0 +1,47 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenstore/gc.h>
+
+#include <gsl/gsl-lite.hpp>
+
+namespace zen {
+
+// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch
+struct AccessTime
+{
+ explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {}
+ AccessTime& operator=(GcClock::Tick Tick) noexcept
+ {
+ SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed);
+ return *this;
+ }
+ operator GcClock::Tick() const noexcept
+ {
+ return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed)))
+ .count();
+ }
+
+ AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
+ AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
+ AccessTime& operator=(AccessTime&& Rhs) noexcept
+ {
+ SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
+ return *this;
+ }
+ AccessTime& operator=(const AccessTime& Rhs) noexcept
+ {
+ SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
+ return *this;
+ }
+
+private:
+ static uint32_t ToSeconds(GcClock::Tick Tick)
+ {
+ return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count());
+ }
+ std::atomic_uint32_t SecondsSinceEpoch;
+};
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 97357e5cb..0c72a13aa 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -156,7 +156,7 @@ public:
void WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback);
typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback;
- void WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback);
+ void WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback);
IoBuffer TryGetChunk(const BlockStoreLocation& Location) const;
void Flush(bool ForceNewBlock);
diff --git a/src/zenstore/include/zenstore/buildstore/buildstore.h b/src/zenstore/include/zenstore/buildstore/buildstore.h
new file mode 100644
index 000000000..302af5f9c
--- /dev/null
+++ b/src/zenstore/include/zenstore/buildstore/buildstore.h
@@ -0,0 +1,186 @@
+
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/blockstore.h>
+
+#include <zencore/iohash.h>
+#include <zenstore/accesstime.h>
+#include <zenstore/caslog.h>
+#include <zenstore/gc.h>
+#include "../compactcas.h"
+#include "../filecas.h"
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+struct BuildStoreConfig
+{
+ std::filesystem::path RootDirectory;
+ uint32_t SmallBlobBlockStoreMaxBlockSize = 256 * 1024 * 1024;
+ uint64_t SmallBlobBlockStoreMaxBlockEmbedSize = 1 * 1024 * 1024;
+ uint32_t SmallBlobBlockStoreAlignement = 16;
+ uint32_t MetadataBlockStoreMaxBlockSize = 64 * 1024 * 1024;
+ uint32_t MetadataBlockStoreAlignement = 8;
+};
+
+class BuildStore : public GcReferencer, public GcReferenceLocker //, public GcStorage
+{
+public:
+ explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc);
+ virtual ~BuildStore();
+
+ void PutBlob(const IoHash& BlobHashes, const IoBuffer& Payload);
+ IoBuffer GetBlob(const IoHash& BlobHashes);
+
+ struct BlobExistsResult
+ {
+ bool HasBody = 0;
+ bool HasMetadata = 0;
+ };
+
+ std::vector<BlobExistsResult> BlobsExists(std::span<const IoHash> BlobHashes);
+
+ void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas);
+ std::vector<IoBuffer> GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool);
+
+ void Flush();
+
+private:
+ void CompactState();
+
+ uint64_t ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount);
+ uint64_t ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount);
+
+ //////// GcReferencer
+ virtual std::string GetGcName(GcCtx& Ctx) override;
+ virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
+ virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
+ virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override;
+
+ //////// GcReferenceLocker
+ virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override;
+
+#pragma pack(push)
+#pragma pack(1)
+ struct PayloadEntry
+ {
+ static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
+ static const uint8_t kStandalone = 0x20u; // This payload is stored as a standalone value
+
+ uint8_t Flags = 0;
+ uint8_t Reserved1 = 0;
+ uint8_t Reserved2 = 0;
+ uint8_t Reserved3 = 0;
+ };
+ static_assert(sizeof(PayloadEntry) == 4);
+
+ struct PayloadDiskEntry
+ {
+ PayloadEntry Entry; // 4 bytes
+ IoHash BlobHash; // 20 bytes
+ };
+ static_assert(sizeof(PayloadDiskEntry) == 24);
+
+ struct MetadataEntry
+ {
+ BlockStoreLocation Location; // 12 bytes
+
+ ZenContentType ContentType = ZenContentType::kCOUNT; // 1 byte
+ static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
+ uint8_t Flags = 0; // 1 byte
+
+ uint8_t Reserved1 = 0;
+ uint8_t Reserved2 = 0;
+ };
+ static_assert(sizeof(MetadataEntry) == 16);
+
+ struct MetadataDiskEntry
+ {
+ MetadataEntry Entry; // 16 bytes
+ IoHash BlobHash; // 20 bytes
+ uint8_t Reserved1 = 0;
+ uint8_t Reserved2 = 0;
+ uint8_t Reserved3 = 0;
+ uint8_t Reserved4 = 0;
+ };
+ static_assert(sizeof(MetadataDiskEntry) == 40);
+
+#pragma pack(pop)
+
+ static bool ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason);
+ static bool ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason);
+
+ struct PayloadIndex
+ {
+ uint32_t Index = std::numeric_limits<uint32_t>::max();
+
+ operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
+ PayloadIndex() = default;
+ explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
+ operator size_t() const { return Index; };
+ inline auto operator<=>(const PayloadIndex& Other) const = default;
+ };
+
+ struct MetadataIndex
+ {
+ uint32_t Index = std::numeric_limits<uint32_t>::max();
+
+ operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
+ MetadataIndex() = default;
+ explicit MetadataIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
+ operator size_t() const { return Index; };
+ inline auto operator<=>(const MetadataIndex& Other) const = default;
+ };
+
+ struct BlobIndex
+ {
+ uint32_t Index = std::numeric_limits<uint32_t>::max();
+
+ operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
+ BlobIndex() = default;
+ explicit BlobIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
+ operator size_t() const { return Index; };
+ inline auto operator<=>(const BlobIndex& Other) const = default;
+ };
+
+ struct BlobEntry
+ {
+ PayloadIndex Payload;
+ MetadataIndex Metadata;
+ AccessTime LastAccessTime;
+ };
+ static_assert(sizeof(BlobEntry) == 12);
+
+ const BuildStoreConfig m_Config;
+ GcManager& m_Gc;
+
+ RwLock m_Lock;
+
+ std::vector<PayloadEntry> m_PayloadEntries;
+ std::vector<MetadataEntry> m_MetadataEntries;
+
+ std::vector<BlobEntry> m_BlobEntries;
+ tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> m_BlobLookup;
+
+ FileCasStrategy m_LargeBlobStore;
+ CasContainerStrategy m_SmallBlobStore;
+ BlockStore m_MetadataBlockStore;
+
+ TCasLogFile<PayloadDiskEntry> m_PayloadlogFile;
+ TCasLogFile<MetadataDiskEntry> m_MetadatalogFile;
+ uint64_t m_BlobLogFlushPosition = 0;
+ uint64_t m_MetaLogFlushPosition = 0;
+
+ std::unique_ptr<HashSet> m_TrackedCacheKeys;
+
+ friend class BuildStoreGcReferenceChecker;
+ friend class BuildStoreGcReferencePruner;
+ friend class BuildStoreGcCompator;
+};
+
+void buildstore_forcelink();
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 05400c784..5a51718d3 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -5,6 +5,7 @@
#include "cacheshared.h"
#include <zencore/stats.h>
+#include <zenstore/accesstime.h>
#include <zenstore/blockstore.h>
#include <zenstore/caslog.h>
diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index 521c78bb1..ef1b803de 100644
--- a/src/zenstore/include/zenstore/cache/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
@@ -72,42 +72,4 @@ struct CacheContentStats
bool IsKnownBadBucketName(std::string_view BucketName);
bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer);
-//////////////////////////////////////////////////////////////////////////
-
-// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch
-struct AccessTime
-{
- explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {}
- AccessTime& operator=(GcClock::Tick Tick) noexcept
- {
- SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed);
- return *this;
- }
- operator GcClock::Tick() const noexcept
- {
- return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed)))
- .count();
- }
-
- AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
- AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
- AccessTime& operator=(AccessTime&& Rhs) noexcept
- {
- SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
- return *this;
- }
- AccessTime& operator=(const AccessTime& Rhs) noexcept
- {
- SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
- return *this;
- }
-
-private:
- static uint32_t ToSeconds(GcClock::Tick Tick)
- {
- return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count());
- }
- std::atomic_uint32_t SecondsSinceEpoch;
-};
-
} // namespace zen
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 3daae0a93..67aadef71 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -55,6 +55,7 @@ struct GcSettings
{
GcClock::TimePoint CacheExpireTime = GcClock::Now();
GcClock::TimePoint ProjectStoreExpireTime = GcClock::Now();
+ GcClock::TimePoint BuildStoreExpireTime = GcClock::Now();
bool CollectSmallObjects = false;
bool IsDeleteMode = false;
bool SkipCidDelete = false;
@@ -412,6 +413,7 @@ struct GcSchedulerConfig
std::chrono::seconds Interval{};
std::chrono::seconds MaxCacheDuration{86400};
std::chrono::seconds MaxProjectStoreDuration{604800};
+ std::chrono::seconds MaxBuildStoreDuration{604800};
bool CollectSmallObjects = true;
bool Enabled = true;
uint64_t DiskReserveSize = 1ul << 28;
@@ -496,6 +498,7 @@ public:
bool CollectSmallObjects = false;
std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max();
std::chrono::seconds MaxProjectStoreDuration = std::chrono::seconds::max();
+ std::chrono::seconds MaxBuildStoreDuration = std::chrono::seconds::max();
uint64_t DiskSizeSoftLimit = 0;
bool SkipCid = false;
bool SkipDelete = false;
@@ -528,6 +531,7 @@ private:
void SchedulerThread();
bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
const GcClock::TimePoint& ProjectStoreExpireTime,
+ const GcClock::TimePoint& BuildStoreExpireTime,
bool Delete,
bool CollectSmallObjects,
bool SkipCid,
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
new file mode 100644
index 000000000..c95215889
--- /dev/null
+++ b/src/zenutil/buildstoragecache.cpp
@@ -0,0 +1,362 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/buildstoragecache.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/packageformat.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class ZenBuildStorageCache : public BuildStorageCache
+{
+public:
+ explicit ZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath)
+ : m_HttpClient(HttpClient)
+ , m_Stats(Stats)
+ , m_Namespace(Namespace.empty() ? "none" : Namespace)
+ , m_Bucket(Bucket.empty() ? "none" : Bucket)
+ , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred())
+ , m_BackgroundWorkPool(1)
+ , m_PendingBackgroundWorkCount(1)
+ , m_CancelBackgroundWork(false)
+ {
+ }
+
+ virtual ~ZenBuildStorageCache()
+ {
+ try
+ {
+ m_CancelBackgroundWork.store(true);
+ m_PendingBackgroundWorkCount.CountDown();
+ m_PendingBackgroundWorkCount.Wait();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what());
+ }
+ }
+
+ void ScheduleBackgroundWork(std::function<void()>&& Work)
+ {
+ m_PendingBackgroundWorkCount.AddCount(1);
+ try
+ {
+ m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork");
+ auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); });
+ if (!m_CancelBackgroundWork)
+ {
+ try
+ {
+ Work();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what());
+ }
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what());
+ }
+ }
+
+ virtual void PutBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload) override
+ {
+ ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
+ ScheduleBackgroundWork(
+ [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob");
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ Payload,
+ ContentType);
+ AddStatistic(CacheResponse);
+ if (!CacheResponse.IsSuccess())
+ {
+ ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv));
+ }
+ });
+ }
+
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ HttpClient::KeyValueMap Headers;
+ if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
+ {
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)});
+ }
+ CreateDirectories(m_TempFolderPath);
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ m_TempFolderPath,
+ Headers);
+ AddStatistic(CacheResponse);
+ if (CacheResponse.IsSuccess())
+ {
+ return CacheResponse.ResponsePayload;
+ }
+ return {};
+ }
+
+ virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override
+ {
+ ScheduleBackgroundWork([this,
+ BuildId = Oid(BuildId),
+ BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()),
+ MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ const uint64_t BlobCount = BlobRawHashes.size();
+
+ CbPackage RequestPackage;
+ std::vector<CbAttachment> Attachments;
+ tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes;
+ Attachments.reserve(BlobCount);
+ AttachmentHashes.reserve(BlobCount);
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.BeginArray("blobHashes");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
+ {
+ RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]);
+ }
+ RequestWriter.EndArray(); // blobHashes
+
+ RequestWriter.BeginArray("metadatas");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
+ {
+ const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash();
+ RequestWriter.AddBinaryAttachment(ObjectHash);
+ if (!AttachmentHashes.contains(ObjectHash))
+ {
+ Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash));
+ AttachmentHashes.insert(ObjectHash);
+ }
+ }
+
+ RequestWriter.EndArray(); // metadatas
+
+ RequestPackage.SetObject(RequestWriter.Save());
+ }
+ RequestPackage.AddAttachments(Attachments);
+
+ CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage);
+
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId),
+ RpcRequestBuffer,
+ ZenContentType::kCbPackage);
+ AddStatistic(CacheResponse);
+ if (!CacheResponse.IsSuccess())
+ {
+ ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv));
+ }
+ });
+ }
+
+ virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ CbObjectWriter Request;
+
+ Request.BeginArray("blobHashes"sv);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ Request.AddHash(BlobHash);
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ AddStatistic(Response);
+ if (Response.IsSuccess())
+ {
+ std::vector<CbObject> Result;
+
+ CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
+ CbObject ResponseObject = ResponsePackage.GetObject();
+
+ CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView();
+ CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView();
+ Result.reserve(MetadatasArray.Num());
+ auto BlobHashesIt = BlobHashes.begin();
+ auto BlobHashArrayIt = begin(BlobHashArray);
+ auto MetadataArrayIt = begin(MetadatasArray);
+ while (MetadataArrayIt != end(MetadatasArray))
+ {
+ const IoHash BlobHash = (*BlobHashArrayIt).AsHash();
+ while (BlobHash != *BlobHashesIt)
+ {
+ ZEN_ASSERT(BlobHashesIt != BlobHashes.end());
+ BlobHashesIt++;
+ }
+
+ ZEN_ASSERT(BlobHash == *BlobHashesIt);
+
+ const IoHash MetaHash = (*MetadataArrayIt).AsAttachment();
+ const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash);
+ ZEN_ASSERT(MetaAttachment);
+
+ CbObject Metadata = MetaAttachment->AsObject();
+ Result.emplace_back(std::move(Metadata));
+
+ BlobHashArrayIt++;
+ MetadataArrayIt++;
+ BlobHashesIt++;
+ }
+ return Result;
+ }
+ return {};
+ }
+
+ virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ CbObjectWriter Request;
+
+ Request.BeginArray("blobHashes"sv);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ Request.AddHash(BlobHash);
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ AddStatistic(Response);
+ if (Response.IsSuccess())
+ {
+ CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload);
+ if (!ResponseObject)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object");
+ }
+ CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView();
+ if (!BlobsExistsArray)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing");
+ }
+ if (BlobsExistsArray.Num() != BlobHashes.size())
+ {
+ throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}",
+ BlobsExistsArray.Num(),
+ BlobHashes.size()));
+ }
+
+ CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView();
+ if (!MetadatasExistsArray)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing");
+ }
+ if (MetadatasExistsArray.Num() != BlobHashes.size())
+ {
+ throw std::runtime_error(
+ fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}",
+ MetadatasExistsArray.Num(),
+ BlobHashes.size()));
+ }
+
+ std::vector<BlobExistsResult> Result;
+ Result.reserve(BlobHashes.size());
+ auto BlobExistsIt = begin(BlobsExistsArray);
+ auto MetadataExistsIt = begin(MetadatasExistsArray);
+ while (BlobExistsIt != end(BlobsExistsArray))
+ {
+ ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray));
+
+ const bool HasBody = (*BlobExistsIt).AsBool();
+ const bool HasMetadata = (*MetadataExistsIt).AsBool();
+
+ Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata});
+
+ BlobExistsIt++;
+ MetadataExistsIt++;
+ }
+ return Result;
+ }
+ return {};
+ }
+
+private:
+ void AddStatistic(const HttpClient::Response& Result)
+ {
+ m_Stats.TotalBytesWritten += Result.UploadedBytes;
+ m_Stats.TotalBytesRead += Result.DownloadedBytes;
+ m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
+ m_Stats.TotalRequestCount++;
+ }
+
+ HttpClient& m_HttpClient;
+ BuildStorageCache::Statistics& m_Stats;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const std::filesystem::path m_TempFolderPath;
+
+ WorkerThreadPool m_BackgroundWorkPool;
+ Latch m_PendingBackgroundWorkCount;
+ std::atomic<bool> m_CancelBackgroundWork;
+};
+
+std::unique_ptr<BuildStorageCache>
+CreateZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath)
+{
+ return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath);
+}
+
+} // namespace zen
diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp
index f3c14edc4..abfc0fb63 100644
--- a/src/zenutil/chunkblock.cpp
+++ b/src/zenutil/chunkblock.cpp
@@ -52,7 +52,7 @@ ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject)
return {};
}
std::vector<ChunkBlockDescription> Result;
- CbArrayView Blocks = BlocksObject["blocks"].AsArrayView();
+ CbArrayView Blocks = BlocksObject["blocks"sv].AsArrayView();
Result.reserve(Blocks.Num());
for (CbFieldView BlockView : Blocks)
{
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index 130fec355..f040e9ece 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -442,18 +442,19 @@ public:
SimulateLatency(0, 0);
}
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override
+ virtual CbObject FindBlocks(const Oid& BuildId) override
{
ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+ SimulateLatency(sizeof(BuildId), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
m_Stats.TotalRequestCount++;
DirectoryContent Content;
GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content);
- std::vector<ChunkBlockDescription> Result;
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
for (const std::filesystem::path& MetaDataFile : Content.Files)
{
IoHash ChunkHash;
@@ -467,24 +468,28 @@ public:
m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ Writer.AddObject(BlockObject);
}
}
}
- SimulateLatency(0, sizeof(IoHash) * Result.size());
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
return Result;
}
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+ SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
m_Stats.TotalRequestCount++;
- std::vector<ChunkBlockDescription> Result;
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
+
for (const IoHash& BlockHash : BlockHashes)
{
std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash);
@@ -495,10 +500,12 @@ public:
m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ Writer.AddObject(BlockObject);
}
}
- SimulateLatency(sizeof(BlockHashes) * BlockHashes.size(), sizeof(ChunkBlockDescription) * Result.size());
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
return Result;
}
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index 2ebd65a00..f8c7c012c 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -54,9 +54,9 @@ public:
uint64_t ChunkSize,
std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0;
- virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0;
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
+ virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
+ virtual CbObject FindBlocks(const Oid& BuildId) = 0;
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0;
};
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
new file mode 100644
index 000000000..08c936bf5
--- /dev/null
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -0,0 +1,52 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compositebuffer.h>
+#include <zenutil/chunkblock.h>
+
+namespace zen {
+
+class HttpClient;
+
+class BuildStorageCache
+{
+public:
+ struct Statistics
+ {
+ std::atomic<uint64_t> TotalBytesRead = 0;
+ std::atomic<uint64_t> TotalBytesWritten = 0;
+ std::atomic<uint64_t> TotalRequestCount = 0;
+ std::atomic<uint64_t> TotalRequestTimeUs = 0;
+ std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ };
+
+ virtual ~BuildStorageCache() {}
+
+ virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0;
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t RangeOffset = 0,
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+
+ virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0;
+ virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ struct BlobExistsResult
+ {
+ bool HasBody = 0;
+ bool HasMetadata = 0;
+ };
+
+ virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+};
+
+std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath);
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
index 758722156..cd28bdcb2 100644
--- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h
+++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
@@ -27,7 +27,6 @@ public:
{
ZEN_MEMSCOPE(ELLMTag::Logging);
- ZEN_MEMSCOPE(ELLMTag::Logging);
std::error_code Ec;
if (RotateOnOpen)
{
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
index d70fd8c00..b6d9e3990 100644
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -49,7 +49,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode));
}
- return PayloadToJson("Failed listing builds"sv, ListResult.Response);
+ return PayloadToCbObject("Failed listing builds"sv, ListResult.Response);
}
virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
@@ -66,7 +66,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
+ return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
}
virtual CbObject GetBuild(const Oid& BuildId) override
@@ -81,7 +81,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
+ return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
}
virtual void FinalizeBuild(const Oid& BuildId) override
@@ -134,7 +134,7 @@ public:
GetBuildPartResult.Reason,
GetBuildPartResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
+ return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
}
virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
@@ -289,7 +289,7 @@ public:
}
}
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override
+ virtual CbObject FindBlocks(const Oid& BuildId) override
{
ZEN_TRACE_CPU("Jupiter::FindBlocks");
@@ -301,10 +301,10 @@ public:
{
throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode));
}
- return ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching known blocks"sv, FindResult.Response));
+ return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response);
}
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("Jupiter::GetBlockMetadata");
@@ -328,24 +328,7 @@ public:
throw std::runtime_error(
fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode));
}
- std::vector<ChunkBlockDescription> UnorderedList =
- ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching block metadatas", GetBlockMetadataResult.Response));
- tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
- for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
- {
- const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
- BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
- }
- std::vector<ChunkBlockDescription> SortedBlockDescriptions;
- SortedBlockDescriptions.reserve(BlockDescriptionLookup.size());
- for (const IoHash& BlockHash : BlockHashes)
- {
- if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end())
- {
- SortedBlockDescriptions.push_back(std::move(UnorderedList[It->second]));
- }
- }
- return SortedBlockDescriptions;
+ return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response);
}
virtual void PutBuildPartStats(const Oid& BuildId,
@@ -373,7 +356,7 @@ public:
}
private:
- static CbObject PayloadToJson(std::string_view Context, const IoBuffer& Payload)
+ static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload)
{
if (Payload.GetContentType() == ZenContentType::kJSON)
{