aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/buildstore/buildstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/buildstore/buildstore.cpp')
-rw-r--r--src/zenstore/buildstore/buildstore.cpp2053
1 files changed, 2053 insertions, 0 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
new file mode 100644
index 000000000..20dc55bca
--- /dev/null
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -0,0 +1,2053 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/buildstore/buildstore.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory/llm.h>
+#include <zencore/scopeguard.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenutil/parallelwork.h>
+
+#include <zencore/uid.h>
+#include <zencore/xxhash.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compress.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zenutil/workerpools.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+const FLLMTag&
+GetBuildstoreTag()
+{
+ static FLLMTag _("store", FLLMTag("builds"));
+
+ return _;
+}
+
+using namespace std::literals;
+
+namespace blobstore::impl {
+
+ const std::string BaseName = "builds";
+ const std::string ManifestExtension = ".cbo";
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
+ const char* AccessTimeExtension = ".zacs";
+
+ const uint32_t ManifestVersion = (1 << 16) | (0 << 8) | (0);
+
+ std::filesystem::path GetManifestPath(const std::filesystem::path& RootDirectory)
+ {
+ return RootDirectory / (BaseName + ManifestExtension);
+ }
+
+ std::filesystem::path GetBlobIndexPath(const std::filesystem::path& RootDirectory)
+ {
+ return RootDirectory / (BaseName + IndexExtension);
+ }
+
+ std::filesystem::path GetBlobLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + LogExtension); }
+
+ std::filesystem::path GetMetaIndexPath(const std::filesystem::path& RootDirectory)
+ {
+ return RootDirectory / (BaseName + "_meta" + IndexExtension);
+ }
+
+ std::filesystem::path GetMetaLogPath(const std::filesystem::path& RootDirectory)
+ {
+ return RootDirectory / (BaseName + "_meta" + LogExtension);
+ }
+
+ std::filesystem::path GetAccessTimesPath(const std::filesystem::path& RootDirectory)
+ {
+ return RootDirectory / (BaseName + AccessTimeExtension);
+ }
+
+ struct AccessTimeRecord
+ {
+ IoHash Key;
+ std::uint32_t SecondsSinceEpoch = 0;
+ };
+
+ static_assert(sizeof(AccessTimeRecord) == 24);
+
+#pragma pack(push)
+#pragma pack(1)
+ struct AccessTimesHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x7363617a; // 'zacs';
+ static constexpr uint32_t CurrentVersion = 1;
+ static constexpr uint64_t DataAlignment = 8;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint32_t AccessTimeCount = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const AccessTimesHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(AccessTimesHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+#pragma pack(pop)
+
+ static_assert(sizeof(AccessTimesHeader) == 16);
+
+} // namespace blobstore::impl
+
+BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc)
+: m_Log(logging::Get("builds"))
+, m_Config(Config)
+, m_Gc(Gc)
+, m_LargeBlobStore(m_Gc)
+, m_SmallBlobStore(Gc)
+, m_MetadataBlockStore()
+{
+ ZEN_TRACE_CPU("BuildStore::BuildStore");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ try
+ {
+ bool IsNew = true;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("{} build store at {} in {}",
+ IsNew ? "Initialized" : "Read",
+ m_Config.RootDirectory,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::filesystem::path BlobLogPath = blobstore::impl::GetBlobLogPath(Config.RootDirectory);
+ std::filesystem::path MetaLogPath = blobstore::impl::GetMetaLogPath(Config.RootDirectory);
+ std::filesystem::path ManifestPath = blobstore::impl::GetManifestPath(Config.RootDirectory);
+ std::filesystem::path AccessTimesPath = blobstore::impl::GetAccessTimesPath(Config.RootDirectory);
+ if (IsFile(ManifestPath) && IsFile(BlobLogPath) && IsFile(MetaLogPath))
+ {
+ IsNew = false;
+ }
+
+ if (!IsNew)
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+
+ CbObject ManifestReader = LoadCompactBinaryObject(ReadFile(ManifestPath).Flatten());
+ Oid ManifestId = ManifestReader["id"].AsObjectId();
+ uint32_t Version = ManifestReader["version"].AsUInt32();
+ DateTime CreationDate = ManifestReader["createdAt"].AsDateTime();
+ ZEN_UNUSED(CreationDate);
+ if (ManifestId == Oid::Zero || Version != blobstore::impl::ManifestVersion)
+ {
+ ZEN_WARN("Invalid manifest at {}, wiping state", ManifestPath);
+ IsNew = true;
+ }
+ else
+ {
+ m_BlobLogFlushPosition = ReadPayloadLog(Lock, BlobLogPath, 0);
+ m_MetaLogFlushPosition = ReadMetadataLog(Lock, MetaLogPath, 0);
+ if (IsFile(AccessTimesPath))
+ {
+ ReadAccessTimes(Lock, AccessTimesPath);
+ }
+ }
+ }
+
+ if (IsNew)
+ {
+ CleanDirectory(Config.RootDirectory, false);
+ CbObjectWriter ManifestWriter;
+ ManifestWriter.AddObjectId("id", Oid::NewOid());
+ ManifestWriter.AddInteger("version", blobstore::impl::ManifestVersion);
+ ManifestWriter.AddDateTime("createdAt", DateTime::Now());
+ TemporaryFile::SafeWriteFile(ManifestPath, ManifestWriter.Save().GetBuffer().AsIoBuffer());
+ }
+ m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew);
+ m_SmallBlobStore.Initialize(Config.RootDirectory,
+ "blob_cas",
+ m_Config.SmallBlobBlockStoreMaxBlockSize,
+ m_Config.SmallBlobBlockStoreAlignement,
+ IsNew);
+ m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20);
+
+ BlockStore::BlockIndexSet KnownBlocks;
+ for (const BlobEntry& Blob : m_BlobEntries)
+ {
+ if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex)
+ {
+ const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex];
+ KnownBlocks.insert(Metadata.Location.BlockIndex);
+ }
+ }
+ BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+
+ m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite);
+ m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite);
+
+ if (!MissingBlocks.empty())
+ {
+ std::vector<MetadataDiskEntry> MissingMetadatas;
+ for (auto& It : m_BlobLookup)
+ {
+ const IoHash& BlobHash = It.first;
+ const BlobIndex ReadBlobIndex = It.second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata];
+ if (MissingBlocks.contains(MetaData.Location.BlockIndex))
+ {
+ MissingMetadatas.push_back(
+ MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash});
+ MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
+ m_MetadataEntries[ReadBlobEntry.Metadata] = {};
+ m_BlobEntries[ReadBlobIndex].Metadata = {};
+ }
+ }
+ }
+ ZEN_ASSERT(!MissingMetadatas.empty());
+
+ for (const MetadataDiskEntry& Entry : MissingMetadatas)
+ {
+ auto It = m_BlobLookup.find(Entry.BlobHash);
+ ZEN_ASSERT(It != m_BlobLookup.end());
+
+ const BlobIndex ReadBlobIndex = It->second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+ if (!ReadBlobEntry.Payload)
+ {
+ m_BlobLookup.erase(It);
+ }
+ }
+ m_MetadatalogFile.Append(MissingMetadatas);
+ CompactState();
+ }
+
+ m_Gc.AddGcReferencer(*this);
+ m_Gc.AddGcReferenceLocker(*this);
+ m_Gc.AddGcStorage(this);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what());
+ m_Gc.RemoveGcStorage(this);
+ m_Gc.RemoveGcReferenceLocker(*this);
+ m_Gc.RemoveGcReferencer(*this);
+ }
+}
+
+BuildStore::~BuildStore()
+{
+ try
+ {
+ ZEN_TRACE_CPU("BuildStore::~BuildStore");
+ m_Gc.RemoveGcStorage(this);
+ m_Gc.RemoveGcReferenceLocker(*this);
+ m_Gc.RemoveGcReferencer(*this);
+ Flush();
+ m_MetadatalogFile.Close();
+ m_PayloadlogFile.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~BuildStore() threw exception: {}", Ex.what());
+ }
+}
+
+void
+BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
+{
+ ZEN_TRACE_CPU("BuildStore::PutBlob");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex BlobIndex = It->second;
+ if (m_BlobEntries[BlobIndex].Payload)
+ {
+ return;
+ }
+ }
+ }
+
+ uint64_t PayloadSize = Payload.GetSize();
+ PayloadEntry Entry;
+ if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize)
+ {
+ CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash);
+ ZEN_UNUSED(Result);
+ Entry = PayloadEntry(PayloadEntry::kStandalone, PayloadSize);
+ }
+ else
+ {
+ CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash);
+ ZEN_UNUSED(Result);
+ Entry = PayloadEntry(0, PayloadSize);
+ }
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ if (Blob.Payload)
+ {
+ m_PayloadEntries[Blob.Payload] = Entry;
+ }
+ else
+ {
+ Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Entry);
+ }
+ Blob.LastAccessTime = GcClock::TickCount();
+ }
+ else
+ {
+ PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Entry);
+
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ // we only remove during GC and compact this then...
+ m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
+ m_BlobLookup.insert({BlobHash, NewBlobIndex});
+ }
+ }
+ m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
+ m_LastAccessTimeUpdateCount++;
+}
+
+IoBuffer
+BuildStore::GetBlob(const IoHash& BlobHash)
+{
+ ZEN_TRACE_CPU("BuildStore::GetBlob");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ RwLock::SharedLockScope Lock(m_Lock);
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ Blob.LastAccessTime = GcClock::TickCount();
+ if (Blob.Payload)
+ {
+ const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload];
+ const bool IsStandalone = (Entry.GetFlags() & PayloadEntry::kStandalone) != 0;
+ Lock.ReleaseNow();
+
+ IoBuffer Chunk;
+ if (IsStandalone)
+ {
+ ZEN_TRACE_CPU("GetLarge");
+ Chunk = m_LargeBlobStore.FindChunk(BlobHash);
+ }
+ else
+ {
+ ZEN_TRACE_CPU("GetSmall");
+ Chunk = m_SmallBlobStore.FindChunk(BlobHash);
+ }
+ if (Chunk)
+ {
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+ return Chunk;
+ }
+ else
+ {
+ ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block");
+ }
+ }
+ }
+ return {};
+}
+
+std::vector<BuildStore::BlobExistsResult>
+BuildStore::BlobsExists(std::span<const IoHash> BlobHashes)
+{
+ ZEN_TRACE_CPU("BuildStore::BlobsExists");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ std::vector<BuildStore::BlobExistsResult> Result;
+ Result.reserve(BlobHashes.size());
+ RwLock::SharedLockScope _(m_Lock);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ bool HasPayload = !!Blob.Payload;
+ bool HasMetadata = !!Blob.Metadata;
+ Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata});
+ }
+ else
+ {
+ Result.push_back({});
+ }
+ }
+ return Result;
+}
+
+void
+BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas)
+{
+ ZEN_TRACE_CPU("BuildStore::PutMetadatas");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ size_t WriteBlobIndex = 0;
+ m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++)
+ {
+ const IoBuffer& Data = MetaDatas[WriteBlobIndex];
+ const IoHash& BlobHash = BlobHashes[WriteBlobIndex];
+ const BlockStoreLocation& Location = Locations[LocationIndex];
+
+ MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0};
+
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ if (Blob.Metadata)
+ {
+ m_MetadataEntries[Blob.Metadata] = Entry;
+ }
+ else
+ {
+ Blob.Metadata = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size()));
+ m_MetadataEntries.push_back(Entry);
+ }
+ Blob.LastAccessTime = GcClock::TickCount();
+ }
+ else
+ {
+ MetadataIndex NewMetadataIndex = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size()));
+ m_MetadataEntries.push_back(Entry);
+
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
+ m_BlobLookup.insert({BlobHash, NewBlobIndex});
+ }
+
+ m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
+
+ m_LastAccessTimeUpdateCount++;
+ WriteBlobIndex++;
+ if (m_TrackedCacheKeys)
+ {
+ m_TrackedCacheKeys->insert(BlobHash);
+ }
+ }
+ });
+}
+
+std::vector<IoBuffer>
+BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool)
+{
+ ZEN_TRACE_CPU("BuildStore::GetMetadatas");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ std::vector<BlockStoreLocation> MetaLocations;
+ std::vector<size_t> MetaLocationResultIndexes;
+ MetaLocations.reserve(BlobHashes.size());
+ MetaLocationResultIndexes.reserve(BlobHashes.size());
+ tsl::robin_set<uint32_t> ReferencedBlocks;
+
+ std::vector<IoBuffer> Result;
+ std::vector<ZenContentType> ResultContentTypes;
+ Result.resize(BlobHashes.size());
+ ResultContentTypes.resize(BlobHashes.size(), ZenContentType::kUnknownContentType);
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (size_t Index = 0; Index < BlobHashes.size(); Index++)
+ {
+ const IoHash& BlobHash = BlobHashes[Index];
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& ExistingBlobEntry = m_BlobEntries[ExistingBlobIndex];
+ if (ExistingBlobEntry.Metadata)
+ {
+ const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata];
+ MetaLocations.push_back(ExistingMetadataEntry.Location);
+ MetaLocationResultIndexes.push_back(Index);
+ ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex);
+ ResultContentTypes[Index] = ExistingMetadataEntry.ContentType;
+ }
+ ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::TickCount());
+ m_LastAccessTimeUpdateCount++;
+ }
+ }
+ }
+
+ auto DoOneBlock = [this](std::span<const BlockStoreLocation> MetaLocations,
+ std::span<const size_t> MetaLocationResultIndexes,
+ std::span<const size_t> ChunkIndexes,
+ std::vector<IoBuffer>& Result) {
+ if (ChunkIndexes.size() < 4)
+ {
+ for (size_t ChunkIndex : ChunkIndexes)
+ {
+ IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]);
+ if (Chunk)
+ {
+ size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
+ Result[ResultIndex] = std::move(Chunk);
+ }
+ }
+ return true;
+ }
+ return m_MetadataBlockStore.IterateBlock(
+ MetaLocations,
+ ChunkIndexes,
+ [&MetaLocationResultIndexes, &Result](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ if (Data != nullptr)
+ {
+ size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
+ Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size);
+ }
+ return true;
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
+ Result[ResultIndex] = File.GetChunk(Offset, Size);
+ return true;
+ },
+ 8u * 1024u);
+ };
+
+ if (!MetaLocations.empty())
+ {
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+
+ try
+ {
+ m_MetadataBlockStore.IterateChunks(
+ MetaLocations,
+ [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock](
+ uint32_t BlockIndex,
+ std::span<const size_t> ChunkIndexes) -> bool {
+ ZEN_UNUSED(BlockIndex);
+ if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1)
+ {
+ return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
+ }
+ else
+ {
+ ZEN_ASSERT(OptionalWorkerPool != nullptr);
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this,
+ &Result,
+ &MetaLocations,
+ &MetaLocationResultIndexes,
+ DoOneBlock,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ try
+ {
+ if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result))
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
+ }
+ });
+ return !Work.IsAborted();
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating block metadata chunks in {}. Reason: '{}'", m_Config.RootDirectory, Ex.what());
+ }
+
+ Work.Wait();
+ }
+ for (size_t Index = 0; Index < Result.size(); Index++)
+ {
+ if (Result[Index])
+ {
+ Result[Index].SetContentType(ResultContentTypes[Index]);
+ }
+ }
+ return Result;
+}
+
+void
+BuildStore::Flush()
+{
+ ZEN_TRACE_CPU("BuildStore::Flush");
+ try
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard(
+ [&] { ZEN_INFO("Flushed build store at {} in {}", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ m_LargeBlobStore.Flush();
+ m_SmallBlobStore.Flush();
+ m_MetadataBlockStore.Flush(false);
+
+ m_PayloadlogFile.Flush();
+ m_MetadatalogFile.Flush();
+
+ if (uint64_t LastAccessTimeUpdateCount = m_LastAccessTimeUpdateCount.load(); LastAccessTimeUpdateCount > 0)
+ {
+ m_LastAccessTimeUpdateCount -= LastAccessTimeUpdateCount;
+ RwLock::ExclusiveLockScope UpdateLock(m_Lock);
+ WriteAccessTimes(UpdateLock, blobstore::impl::GetAccessTimesPath(m_Config.RootDirectory));
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("BuildStore::Flush failed. Reason: {}", Ex.what());
+ }
+}
+
+BuildStore::StorageStats
+BuildStore::GetStorageStats() const
+{
+ StorageStats Result;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Result.EntryCount = m_BlobLookup.size();
+
+ for (auto LookupIt : m_BlobLookup)
+ {
+ const BlobIndex ReadBlobIndex = LookupIt.second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+ if (ReadBlobEntry.Payload)
+ {
+ const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload];
+ uint64_t Size = Payload.GetSize();
+ if ((Payload.GetFlags() & PayloadEntry::kStandalone) != 0)
+ {
+ Result.LargeBlobCount++;
+ Result.LargeBlobBytes += Size;
+ }
+ else
+ {
+ Result.SmallBlobCount++;
+ Result.SmallBlobBytes += Size;
+ }
+ }
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata];
+ Result.MetadataCount++;
+ Result.MetadataByteCount += Metadata.Location.Size;
+ }
+ }
+ }
+ return Result;
+}
+
+#if ZEN_WITH_TESTS
+std::optional<AccessTime>
+BuildStore::GetLastAccessTime(const IoHash& Key) const
+{
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end())
+ {
+ const BlobIndex Index = It->second;
+ const BlobEntry& Entry = m_BlobEntries[Index];
+ return Entry.LastAccessTime;
+ }
+ return {};
+}
+
+bool
+BuildStore::SetLastAccessTime(const IoHash& Key, const AccessTime& Time)
+{
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end())
+ {
+ const BlobIndex Index = It->second;
+ BlobEntry& Entry = m_BlobEntries[Index];
+ Entry.LastAccessTime = Time;
+ return true;
+ }
+ return false;
+}
+#endif // ZEN_WITH_TESTS
+
+void
+BuildStore::CompactState()
+{
+ ZEN_TRACE_CPU("BuildStore::CompactState");
+
+ std::vector<BlobEntry> BlobEntries;
+ std::vector<PayloadEntry> PayloadEntries;
+ std::vector<MetadataEntry> MetadataEntries;
+
+ tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> BlobLookup;
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+ const size_t EntryCount = m_BlobLookup.size();
+ BlobLookup.reserve(EntryCount);
+ const size_t PayloadCount = m_PayloadEntries.size();
+ PayloadEntries.reserve(PayloadCount);
+ const size_t MetadataCount = m_MetadataEntries.size();
+ MetadataEntries.reserve(MetadataCount);
+
+ for (auto LookupIt : m_BlobLookup)
+ {
+ const IoHash& BlobHash = LookupIt.first;
+ const BlobIndex ReadBlobIndex = LookupIt.second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+
+ const BlobIndex WriteBlobIndex(gsl::narrow<uint32_t>(BlobEntries.size()));
+ BlobEntries.push_back(ReadBlobEntry);
+ BlobEntry& WriteBlobEntry = BlobEntries.back();
+
+ if (WriteBlobEntry.Payload)
+ {
+ const PayloadEntry& ReadPayloadEntry = m_PayloadEntries[ReadBlobEntry.Payload];
+ WriteBlobEntry.Payload = PayloadIndex(gsl::narrow<uint32_t>(PayloadEntries.size()));
+ PayloadEntries.push_back(ReadPayloadEntry);
+ }
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& ReadMetadataEntry = m_MetadataEntries[ReadBlobEntry.Metadata];
+ WriteBlobEntry.Metadata = MetadataIndex(gsl::narrow<uint32_t>(MetadataEntries.size()));
+ MetadataEntries.push_back(ReadMetadataEntry);
+ }
+
+ BlobLookup.insert({BlobHash, WriteBlobIndex});
+ }
+ m_BlobEntries.swap(BlobEntries);
+ m_PayloadEntries.swap(PayloadEntries);
+ m_MetadataEntries.swap(MetadataEntries);
+ m_BlobLookup.swap(BlobLookup);
+}
+
+uint64_t
+BuildStore::ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
+{
+ ZEN_TRACE_CPU("BuildStore::ReadPayloadLog");
+ if (!IsFile(LogPath))
+ {
+ return 0;
+ }
+
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read build store '{}' payload log containing {} entries in {}",
+ LogPath,
+ LogEntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<PayloadDiskEntry> CasLog;
+ if (!CasLog.IsValid(LogPath))
+ {
+ RemoveFile(LogPath);
+ return 0;
+ }
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (!CasLog.Initialize())
+ {
+ return 0;
+ }
+
+ const uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full payload log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+
+ LogEntryCount = EntryCount - SkipEntryCount;
+ uint64_t InvalidEntryCount = 0;
+
+ CasLog.Replay(
+ [&](const PayloadDiskEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Entry.GetFlags() & PayloadEntry::kTombStone)
+ {
+ // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
+ if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end())
+ {
+ if (!m_BlobEntries[ExistingIt->second].Metadata)
+ {
+ m_BlobLookup.erase(ExistingIt);
+ }
+ else
+ {
+ m_BlobEntries[ExistingIt->second].Payload = {};
+ }
+ }
+ return;
+ }
+
+ if (!ValidatePayloadDiskEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid payload entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex];
+ if (ExistingBlob.Payload)
+ {
+ const PayloadIndex ExistingPayloadIndex = ExistingBlob.Payload;
+ m_PayloadEntries[ExistingPayloadIndex] = Record.Entry;
+ }
+ else
+ {
+ const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Record.Entry);
+ ExistingBlob.Payload = NewPayloadIndex;
+ }
+ }
+ else
+ {
+ const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Record.Entry);
+
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
+ m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex);
+ }
+ },
+ SkipEntryCount);
+
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid payload entries in '{}'", InvalidEntryCount, LogPath);
+ }
+
+ return LogEntryCount;
+}
+
+uint64_t
+BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
+{
+ ZEN_TRACE_CPU("BuildStore::ReadMetadataLog");
+ if (!IsFile(LogPath))
+ {
+ return 0;
+ }
+
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read build store '{}' metadata log containing {} entries in {}",
+ LogPath,
+ LogEntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<MetadataDiskEntry> CasLog;
+ if (!CasLog.IsValid(LogPath))
+ {
+ RemoveFile(LogPath);
+ return 0;
+ }
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (!CasLog.Initialize())
+ {
+ return 0;
+ }
+
+ const uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full metadata log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+
+ LogEntryCount = EntryCount - SkipEntryCount;
+ uint64_t InvalidEntryCount = 0;
+
+ CasLog.Replay(
+ [&](const MetadataDiskEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Entry.Flags & MetadataEntry::kTombStone)
+ {
+ // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
+ // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
+ if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end())
+ {
+ if (!m_BlobEntries[ExistingIt->second].Payload)
+ {
+ m_BlobLookup.erase(ExistingIt);
+ }
+ else
+ {
+ m_BlobEntries[ExistingIt->second].Metadata = {};
+ }
+ }
+ return;
+ }
+
+ if (!ValidateMetadataDiskEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid metadata entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end())
+ {
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex];
+ if (ExistingBlob.Metadata)
+ {
+ const MetadataIndex ExistingMetadataIndex = ExistingBlob.Metadata;
+ m_MetadataEntries[ExistingMetadataIndex] = Record.Entry;
+ }
+ else
+ {
+ const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size()));
+ m_MetadataEntries.push_back(Record.Entry);
+ ExistingBlob.Metadata = NewMetadataIndex;
+ }
+ }
+ else
+ {
+ const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size()));
+ m_MetadataEntries.push_back(Record.Entry);
+
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
+ m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex);
+ }
+ },
+ SkipEntryCount);
+
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid metadata entries in '{}'", InvalidEntryCount, LogPath);
+ }
+
+ return LogEntryCount;
+}
+
+void
+BuildStore::ReadAccessTimes(const RwLock::ExclusiveLockScope&, const std::filesystem::path& AccessTimesPath)
+{
+ ZEN_TRACE_CPU("BuildStore::ReadAccessTimes");
+
+ using namespace blobstore::impl;
+
+ BasicFile AccessTimesFile;
+ AccessTimesFile.Open(AccessTimesPath, BasicFile::Mode::kRead);
+ uint64_t Size = AccessTimesFile.FileSize();
+ if (Size >= sizeof(AccessTimesHeader))
+ {
+ AccessTimesHeader Header;
+ uint64_t Offset = 0;
+ AccessTimesFile.Read(&Header, sizeof(Header), 0);
+ Offset += sizeof(AccessTimesHeader);
+ Offset = RoundUp(Offset, AccessTimesHeader::DataAlignment);
+ if ((Header.Magic == AccessTimesHeader::ExpectedMagic) && (Header.Version == AccessTimesHeader::CurrentVersion) &&
+ (Header.Checksum == AccessTimesHeader::ComputeChecksum(Header)))
+ {
+ uint64_t RecordsSize = sizeof(AccessTimeRecord) * Header.AccessTimeCount;
+ if (AccessTimesFile.FileSize() >= Offset + RecordsSize)
+ {
+ std::vector<AccessTimeRecord> AccessRecords(Header.AccessTimeCount);
+ AccessTimesFile.Read(AccessRecords.data(), RecordsSize, Offset);
+ for (const AccessTimeRecord& Record : AccessRecords)
+ {
+ const IoHash& Key = Record.Key;
+ const uint32_t SecondsSinceEpoch = Record.SecondsSinceEpoch;
+ if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end())
+ {
+ const BlobIndex Index = It->second;
+ BlobEntry& Entry = m_BlobEntries[Index];
+ Entry.LastAccessTime.SetSecondsSinceEpoch(SecondsSinceEpoch);
+ }
+ else
+ {
+ m_LastAccessTimeUpdateCount++;
+ }
+ }
+ }
+ else
+ {
+ m_LastAccessTimeUpdateCount++;
+ }
+ }
+ else
+ {
+ m_LastAccessTimeUpdateCount++;
+ }
+ }
+ else
+ {
+ m_LastAccessTimeUpdateCount++;
+ }
+}
+
+void
+BuildStore::WriteAccessTimes(const RwLock::ExclusiveLockScope&, const std::filesystem::path& AccessTimesPath)
+{
+ ZEN_TRACE_CPU("BuildStore::WriteAccessTimes");
+
+ using namespace blobstore::impl;
+
+ uint32_t Count = gsl::narrow<uint32_t>(m_BlobLookup.size());
+ AccessTimesHeader Header = {.AccessTimeCount = Count};
+ Header.Checksum = AccessTimesHeader::ComputeChecksum(Header);
+
+ TemporaryFile TempFile;
+ std::error_code Ec;
+ if (TempFile.CreateTemporary(AccessTimesPath.parent_path(), Ec); Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed to create temporary file {} to write access times. Reason ({}) {}",
+ TempFile.GetPath(),
+ Ec.value(),
+ Ec.message()));
+ }
+ {
+ uint64_t Offset = 0;
+ TempFile.Write(&Header, sizeof(AccessTimesHeader), Offset);
+ Offset += sizeof(AccessTimesHeader);
+ Offset = RoundUp(Offset, AccessTimesHeader::DataAlignment);
+
+ std::vector<AccessTimeRecord> AccessRecords;
+ AccessRecords.reserve(Header.AccessTimeCount);
+
+ for (auto It : m_BlobLookup)
+ {
+ const IoHash& Key = It.first;
+ const BlobIndex Index = It.second;
+ const BlobEntry& Entry = m_BlobEntries[Index];
+ const uint32_t SecondsSinceEpoch = Entry.LastAccessTime.GetSecondsSinceEpoch();
+ AccessRecords.emplace_back(AccessTimeRecord{.Key = Key, .SecondsSinceEpoch = SecondsSinceEpoch});
+ }
+ uint64_t RecordsSize = sizeof(AccessTimeRecord) * Header.AccessTimeCount;
+ TempFile.Write(AccessRecords.data(), RecordsSize, Offset);
+ Offset += sizeof(AccessTimesHeader) * Header.AccessTimeCount;
+ }
+ if (TempFile.MoveTemporaryIntoPlace(AccessTimesPath, Ec); Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed to move temporary file {} to {} when write access times. Reason ({}) {}",
+ TempFile.GetPath(),
+ AccessTimesPath,
+ Ec.value(),
+ Ec.message()));
+ }
+}
+
+bool
+BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason)
+{
+ if (Entry.BlobHash == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.GetFlags(), Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Entry.GetFlags() & PayloadEntry::kTombStone)
+ {
+ return true;
+ }
+ if (Entry.Entry.GetSize() == 0 || Entry.Entry.GetSize() == 0x00ffffffffffffffu)
+ {
+ OutReason = fmt::format("Invalid size field {} for meta entry {}", Entry.Entry.GetSize(), Entry.BlobHash.ToHexString());
+ return false;
+ }
+ return true;
+}
+
+bool
+BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason)
+{
+ if (Entry.BlobHash == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Entry.Location.Size == 0)
+ {
+ OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size);
+ return false;
+ }
+ if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0)
+ {
+ OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Entry.Flags & MetadataEntry::kTombStone)
+ {
+ return true;
+ }
+ if (Entry.Entry.ContentType == ZenContentType::kCOUNT)
+ {
+ OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0)
+ {
+ OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
+ return false;
+ }
+ return true;
+}
+
+class BuildStoreGcReferenceChecker : public GcReferenceChecker
+{
+public:
+ BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {}
+ virtual std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string());
+ }
+
+ virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); }
+
+ virtual void UpdateLockedState(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Builds::UpdateLockedState");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ m_References.reserve(m_Store.m_BlobLookup.size());
+ for (const auto& It : m_Store.m_BlobLookup)
+ {
+ const BuildStore::BlobIndex ExistingBlobIndex = It.second;
+ if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload)
+ {
+ m_References.push_back(It.first);
+ }
+ }
+ FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References);
+ }
+
+ virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
+ {
+ ZEN_UNUSED(Ctx);
+ ZEN_TRACE_CPU("Builds::GetUnusedReferences");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ size_t InitialCount = IoCids.size();
+ size_t UsedCount = InitialCount;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: buildstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
+ "buildstore",
+ UsedCount,
+ InitialCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids);
+ UsedCount = IoCids.size() - UnusedReferences.size();
+ return UnusedReferences;
+ }
+
+private:
+ BuildStore& m_Store;
+ std::vector<IoHash> m_References;
+};
+
+std::string
+BuildStore::GetGcName(GcCtx& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string());
+}
+
+class BuildStoreGcCompator : public GcStoreCompactor
+{
+ using BlobEntry = BuildStore::BlobEntry;
+ using PayloadEntry = BuildStore::PayloadEntry;
+ using MetadataEntry = BuildStore::MetadataEntry;
+ using MetadataDiskEntry = BuildStore::MetadataDiskEntry;
+ using BlobIndex = BuildStore::BlobIndex;
+ using PayloadIndex = BuildStore::PayloadIndex;
+ using MetadataIndex = BuildStore::MetadataIndex;
+
+public:
+ BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {}
+
+ virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
+ {
+ ZEN_UNUSED(ClaimDiskReserveCallback);
+ ZEN_TRACE_CPU("Builds::CompactStore");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}",
+ m_Store.m_Config.RootDirectory,
+ NiceBytes(Stats.RemovedDisk),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ const auto __ = MakeGuard([&] { m_Store.Flush(); });
+
+ if (!m_RemovedBlobs.empty())
+ {
+ if (Ctx.Settings.CollectSmallObjects)
+ {
+ m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); });
+ auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); });
+
+ BlockStore::BlockUsageMap BlockUsage;
+ {
+ RwLock::SharedLockScope __(m_Store.m_Lock);
+
+ for (auto LookupIt : m_Store.m_BlobLookup)
+ {
+ const BlobIndex ReadBlobIndex = LookupIt.second;
+ const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex];
+
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata];
+
+ uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex;
+ uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement);
+
+ if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end())
+ {
+ BlockStore::BlockUsageInfo& Info = BlockUsageIt.value();
+ Info.EntryCount++;
+ Info.DiskUsage += ChunkSize;
+ }
+ else
+ {
+ BlockUsage.insert_or_assign(BlockIndex,
+ BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
+ }
+ }
+ }
+ }
+
+ BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90);
+ BlockStoreCompactState BlockCompactState;
+ std::vector<IoHash> BlockCompactStateKeys;
+ BlockCompactState.IncludeBlocks(BlocksToCompact);
+
+ if (BlocksToCompact.size() > 0)
+ {
+ {
+ RwLock::SharedLockScope ___(m_Store.m_Lock);
+ for (const auto& Entry : m_Store.m_BlobLookup)
+ {
+ BlobIndex Index = Entry.second;
+
+ if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
+ {
+ if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location))
+ {
+ BlockCompactStateKeys.push_back(Entry.first);
+ }
+ }
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks",
+ m_Store.m_Config.RootDirectory,
+ BlocksToCompact.size());
+ }
+
+ m_Store.m_MetadataBlockStore.CompactBlocks(
+ BlockCompactState,
+ m_Store.m_Config.MetadataBlockStoreAlignement,
+ [&](const BlockStore::MovedChunksArray& MovedArray,
+ const BlockStore::ChunkIndexArray& ScrubbedArray,
+ uint64_t FreedDiskSpace) {
+ std::vector<MetadataDiskEntry> MovedEntries;
+ MovedEntries.reserve(MovedArray.size());
+ RwLock::ExclusiveLockScope _(m_Store.m_Lock);
+ for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
+ {
+ size_t ChunkIndex = Moved.first;
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+
+ ZEN_ASSERT(m_Store.m_TrackedCacheKeys);
+ if (m_Store.m_TrackedCacheKeys->contains(Key))
+ {
+ continue;
+ }
+
+ if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end())
+ {
+ const BlobIndex Index = It->second;
+
+ if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
+ {
+ m_Store.m_MetadataEntries[Meta].Location = Moved.second;
+ MovedEntries.push_back(
+ MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key});
+ }
+ }
+ }
+
+ for (size_t Scrubbed : ScrubbedArray)
+ {
+ const IoHash& Key = BlockCompactStateKeys[Scrubbed];
+ if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end())
+ {
+ const BlobIndex Index = It->second;
+
+ if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
+ {
+ MovedEntries.push_back(
+ MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key});
+ MovedEntries.back().Entry.Flags |= MetadataEntry::kTombStone;
+ m_Store.m_MetadataEntries[Meta] = {};
+ m_Store.m_BlobEntries[Index].Metadata = {};
+ }
+ }
+ }
+
+ m_Store.m_MetadatalogFile.Append(MovedEntries);
+
+ Stats.RemovedDisk += FreedDiskSpace;
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+ return true;
+ },
+ ClaimDiskReserveCallback,
+ fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory));
+ }
+ else
+ {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks",
+ m_Store.m_Config.RootDirectory,
+ BlocksToCompact.size());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ virtual std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string());
+ }
+
+private:
+ BuildStore& m_Store;
+ const std::vector<IoHash> m_RemovedBlobs;
+};
+
+GcStoreCompactor*
+BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
+{
+ ZEN_TRACE_CPU("Builds::RemoveExpiredData");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: buildstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}",
+ m_Config.RootDirectory,
+ Stats.CheckedCount,
+ Stats.FoundCount,
+ Stats.DeletedCount,
+ NiceBytes(Stats.FreedMemory),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ });
+
+ const GcClock::Tick ExpireTicks = Ctx.Settings.BuildStoreExpireTime.time_since_epoch().count();
+ std::vector<IoHash> ExpiredBlobs;
+ tsl::robin_set<IoHash, IoHash::Hasher> SizeDroppedBlobs;
+
+ {
+ struct SizeInfo
+ {
+ const IoHash Key;
+ uint32_t SecondsSinceEpoch = 0;
+ uint64_t BlobSize = 0;
+ };
+
+ bool DiskSizeExceeded = false;
+ const uint64_t CurrentDiskSize =
+ m_LargeBlobStore.StorageSize().DiskSize + m_SmallBlobStore.StorageSize().DiskSize + m_MetadataBlockStore.TotalSize();
+ if (CurrentDiskSize > m_Config.MaxDiskSpaceLimit)
+ {
+ DiskSizeExceeded = true;
+ }
+
+ uint64_t ExpiredDataSize = 0;
+
+ std::vector<SizeInfo> NonExpiredBlobSizeInfos;
+
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ if (DiskSizeExceeded)
+ {
+ NonExpiredBlobSizeInfos.reserve(m_BlobLookup.size());
+ }
+ for (const auto& It : m_BlobLookup)
+ {
+ const BlobIndex ReadBlobIndex = It.second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+ uint64_t Size = 0;
+ if (ReadBlobEntry.Payload)
+ {
+ const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload];
+ Size += Payload.GetSize();
+ }
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata];
+ Size += Metadata.Location.Size;
+ }
+
+ const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime;
+ if (AccessTick < ExpireTicks)
+ {
+ ExpiredBlobs.push_back(It.first);
+ ExpiredDataSize += ExpiredDataSize;
+ }
+ else if (DiskSizeExceeded)
+ {
+ NonExpiredBlobSizeInfos.emplace_back(SizeInfo{.Key = It.first,
+ .SecondsSinceEpoch = ReadBlobEntry.LastAccessTime.GetSecondsSinceEpoch(),
+ .BlobSize = Size});
+ }
+ }
+ Stats.CheckedCount += m_BlobLookup.size();
+ Stats.FoundCount += ExpiredBlobs.size();
+ }
+
+ if (DiskSizeExceeded)
+ {
+ const uint64_t NewSizeLimit =
+ m_Config.MaxDiskSpaceLimit -
+ (m_Config.MaxDiskSpaceLimit >> 4); // Remove a bit more than just below the limit so we have some space to grow
+ if ((CurrentDiskSize - ExpiredDataSize) > NewSizeLimit)
+ {
+ std::vector<size_t> NonExpiredOrder;
+ NonExpiredOrder.resize(NonExpiredBlobSizeInfos.size());
+ for (size_t Index = 0; Index < NonExpiredOrder.size(); Index++)
+ {
+ NonExpiredOrder[Index] = Index;
+ }
+ std::sort(NonExpiredOrder.begin(), NonExpiredOrder.end(), [&NonExpiredBlobSizeInfos](const size_t Lhs, const size_t Rhs) {
+ const SizeInfo& LhsInfo = NonExpiredBlobSizeInfos[Lhs];
+ const SizeInfo& RhsInfo = NonExpiredBlobSizeInfos[Rhs];
+ return LhsInfo.SecondsSinceEpoch < RhsInfo.SecondsSinceEpoch;
+ });
+
+ auto It = NonExpiredOrder.begin();
+ while (It != NonExpiredOrder.end())
+ {
+ const SizeInfo& Info = NonExpiredBlobSizeInfos[*It];
+ if ((CurrentDiskSize - ExpiredDataSize) < NewSizeLimit)
+ {
+ break;
+ }
+ ExpiredDataSize += Info.BlobSize;
+ ExpiredBlobs.push_back(Info.Key);
+ SizeDroppedBlobs.insert(Info.Key);
+ It++;
+ }
+ }
+ }
+ }
+
+ std::vector<IoHash> RemovedBlobs;
+ if (!ExpiredBlobs.empty())
+ {
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ RemovedBlobs.reserve(ExpiredBlobs.size());
+
+ std::vector<PayloadDiskEntry> RemovedPayloads;
+ std::vector<MetadataDiskEntry> RemoveMetadatas;
+
+ RwLock::ExclusiveLockScope __(m_Lock);
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+
+ for (const IoHash& ExpiredBlob : ExpiredBlobs)
+ {
+ if (auto It = m_BlobLookup.find(ExpiredBlob); It != m_BlobLookup.end())
+ {
+ const BlobIndex ReadBlobIndex = It->second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+
+ const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime;
+
+ if (SizeDroppedBlobs.contains(ExpiredBlob) || (AccessTick < ExpireTicks))
+ {
+ if (ReadBlobEntry.Payload)
+ {
+ RemovedPayloads.push_back(
+ PayloadDiskEntry{.Entry = m_PayloadEntries[ReadBlobEntry.Payload], .BlobHash = ExpiredBlob});
+ RemovedPayloads.back().Entry.AddFlag(PayloadEntry::kTombStone);
+ m_PayloadEntries[ReadBlobEntry.Payload] = {};
+ m_BlobEntries[ReadBlobIndex].Payload = {};
+ }
+ if (ReadBlobEntry.Metadata)
+ {
+ RemoveMetadatas.push_back(
+ MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob});
+ RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
+ m_MetadataEntries[ReadBlobEntry.Metadata] = {};
+ m_BlobEntries[ReadBlobIndex].Metadata = {};
+ }
+
+ m_BlobLookup.erase(It);
+ m_LastAccessTimeUpdateCount++;
+
+ RemovedBlobs.push_back(ExpiredBlob);
+ Stats.DeletedCount++;
+ }
+ }
+ }
+ if (!RemovedPayloads.empty())
+ {
+ m_PayloadlogFile.Append(RemovedPayloads);
+ }
+ if (!RemoveMetadatas.empty())
+ {
+ m_MetadatalogFile.Append(RemoveMetadatas);
+ }
+ }
+ }
+
+ if (!RemovedBlobs.empty())
+ {
+ CompactState();
+ }
+
+ return new BuildStoreGcCompator(*this, std::move(RemovedBlobs));
+}
+
+std::vector<GcReferenceChecker*>
+BuildStore::CreateReferenceCheckers(GcCtx& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+ return {new BuildStoreGcReferenceChecker(*this)};
+}
+
+std::vector<GcReferenceValidator*>
+BuildStore::CreateReferenceValidators(GcCtx& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+ return {};
+}
+
+std::vector<RwLock::SharedLockScope>
+BuildStore::LockState(GcCtx& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_Lock));
+ return Locks;
+}
+
+void
+BuildStore::ScrubStorage(ScrubContext& ScrubCtx)
+{
+ ZEN_UNUSED(ScrubCtx);
+ // TODO
+}
+
+GcStorageSize
+BuildStore::StorageSize() const
+{
+ GcStorageSize Result;
+ Result.DiskSize = m_MetadataBlockStore.TotalSize();
+ return Result;
+}
+
+/*
+ ___________ __
+ \__ ___/___ _______/ |_ ______
+ | |_/ __ \ / ___/\ __\/ ___/
+ | |\ ___/ \___ \ | | \___ \
+ |____| \___ >____ > |__| /____ >
+ \/ \/ \/
+*/
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("BuildStore.Blobs")
+{
+ ScopedTemporaryDirectory _;
+
+ BuildStoreConfig Config;
+ Config.RootDirectory = _.Path() / "build_store";
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ IoBuffer Payload = Store.GetBlob(RawHash);
+ CHECK(Payload);
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ IoBuffer Payload = Store.GetBlob(RawHash);
+ CHECK(Payload);
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ IoBuffer Payload = Store.GetBlob(RawHash);
+ CHECK(Payload);
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+ }
+}
+
+namespace blockstore::testing {
+ IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues)
+ {
+ CbObjectWriter Writer;
+ Writer.AddHash("rawHash"sv, BlobHash);
+ Writer.BeginObject("values");
+ {
+ for (const auto& V : KeyValues)
+ {
+ Writer.AddString(V.first, V.second);
+ }
+ }
+ Writer.EndObject(); // values
+ return Writer.Save().GetBuffer().AsIoBuffer();
+ };
+
+} // namespace blockstore::testing
+
+TEST_CASE("BuildStore.Metadata")
+{
+ using namespace blockstore::testing;
+
+ ScopedTemporaryDirectory _;
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
+
+ BuildStoreConfig Config;
+ Config.RootDirectory = _.Path() / "build_store";
+
+ std::vector<IoHash> BlobHashes;
+ std::vector<IoBuffer> MetaPayloads;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I)));
+ MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}}));
+ MetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(BlobHashes, MetaPayloads);
+
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
+ CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
+ for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
+ {
+ const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]);
+ CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash);
+ }
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
+ CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
+ for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
+ {
+ const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]);
+ CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash);
+ }
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ CHECK(!Store.GetBlob(BlobHash));
+ }
+ }
+ std::vector<IoHash> CompressedBlobsHashes;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
+ for (const auto& MetadataIt : MetadataPayloads)
+ {
+ CHECK(!MetadataIt);
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(Blob);
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash);
+ }
+ }
+
+ std::vector<IoBuffer> BlobMetaPayloads;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK_EQ(IoHash::HashBuffer(MetadataPayload), IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ }
+
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(Blob);
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash);
+ }
+
+ BlobMetaPayloads.clear();
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ BlobMetaPayloads.push_back(
+ MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}}));
+ BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(Blob);
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash);
+ }
+ }
+}
+
+TEST_CASE("BuildStore.GC")
+{
+ using namespace blockstore::testing;
+
+ ScopedTemporaryDirectory _;
+
+ BuildStoreConfig Config;
+ Config.RootDirectory = _.Path() / "build_store";
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ std::vector<IoBuffer> BlobMetaPayloads;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ {
+ GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = false,
+ .Verbose = true});
+ CHECK(!Result.WasCancelled);
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(Blob);
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash);
+ }
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ }
+ {
+ GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .Verbose = true});
+ CHECK(!Result.WasCancelled);
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ CHECK(!Blob);
+ }
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ CHECK(!MetadataPayload);
+ }
+ }
+ }
+}
+
+TEST_CASE("BuildStore.SizeLimit")
+{
+ using namespace blockstore::testing;
+
+ ScopedTemporaryDirectory _;
+
+ BuildStoreConfig Config = {.MaxDiskSpaceLimit = 1024u * 1024u};
+ Config.RootDirectory = _.Path() / "build_store";
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ std::vector<IoBuffer> BlobMetaPayloads;
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+ for (size_t I = 0; I < 64; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(65537 + I * 7);
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::Compress(SharedBuffer(std::move(Blob)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ Store.PutBlob(CompressedBlobsHashes.back(), Payload);
+ }
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
+ }
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+
+ {
+ for (size_t I = 0; I < 64; I++)
+ {
+ const IoHash& Key = CompressedBlobsHashes[I];
+ GcClock::Tick AccessTick = (GcClock::Now() + std::chrono::minutes(I)).time_since_epoch().count();
+
+ Store.SetLastAccessTime(Key, AccessTime(AccessTick));
+ }
+ }
+ }
+ {
+ GcManager Gc;
+ BuildStore Store(Config, Gc);
+
+ {
+ GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .Verbose = true});
+
+ uint32_t DeletedBlobs = 0;
+
+ CHECK(!Result.WasCancelled);
+ for (const IoHash& BlobHash : CompressedBlobsHashes)
+ {
+ IoBuffer Blob = Store.GetBlob(BlobHash);
+ if (!Blob)
+ {
+ DeletedBlobs++;
+ }
+ else
+ {
+ IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer();
+ CHECK(DecompressedBlob);
+ CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash);
+ }
+ }
+ CHECK(DeletedBlobs == 50);
+
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
+ for (size_t I = 0; I < MetadataPayloads.size(); I++)
+ {
+ const IoBuffer& MetadataPayload = MetadataPayloads[I];
+ if (I < DeletedBlobs)
+ {
+ CHECK(!MetadataPayload);
+ }
+ else
+ {
+ CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I]));
+ }
+ }
+ }
+ }
+}
+
+void
+buildstore_forcelink()
+{
+}
+
+#endif
+
+} // namespace zen