// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include # include #endif // ZEN_WITH_TESTS namespace zen { const FLLMTag& GetBuildstoreTag() { static FLLMTag _("store", FLLMTag("builds")); return _; } using namespace std::literals; namespace blobstore::impl { const std::string BaseName = "builds"; const std::string ManifestExtension = ".cbo"; const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; const char* AccessTimeExtension = ".zacs"; const uint32_t ManifestVersion = (2 << 16) | (0 << 8) | (0); std::filesystem::path GetManifestPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + ManifestExtension); } std::filesystem::path GetBlobIndexPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + IndexExtension); } std::filesystem::path GetBlobLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + LogExtension); } std::filesystem::path GetMetaIndexPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + "_meta" + IndexExtension); } std::filesystem::path GetMetaLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + "_meta" + LogExtension); } std::filesystem::path GetAccessTimesPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + AccessTimeExtension); } struct AccessTimeRecord { IoHash Key; std::uint32_t SecondsSinceEpoch = 0; }; static_assert(sizeof(AccessTimeRecord) == 24); #pragma pack(push) #pragma pack(1) struct AccessTimesHeader { static constexpr uint32_t ExpectedMagic = 0x7363617a; // 'zacs'; static constexpr uint32_t CurrentVersion = 1; static constexpr uint64_t DataAlignment = 8; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint32_t AccessTimeCount = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const AccessTimesHeader& Header) { return XXH32(&Header.Magic, sizeof(AccessTimesHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; #pragma pack(pop) static_assert(sizeof(AccessTimesHeader) == 16); } // namespace blobstore::impl BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore) : m_Log(logging::Get("builds")) , m_Config(Config) , m_Gc(Gc) , m_BlobStore(BlobStore) { ZEN_TRACE_CPU("BuildStore::BuildStore"); ZEN_MEMSCOPE(GetBuildstoreTag()); try { bool IsNew = true; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("{} build store at {} in {}", IsNew ? "Initialized" : "Read", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::filesystem::path BlobLogPath = blobstore::impl::GetBlobLogPath(Config.RootDirectory); std::filesystem::path MetaLogPath = blobstore::impl::GetMetaLogPath(Config.RootDirectory); std::filesystem::path ManifestPath = blobstore::impl::GetManifestPath(Config.RootDirectory); std::filesystem::path AccessTimesPath = blobstore::impl::GetAccessTimesPath(Config.RootDirectory); if (IsFile(ManifestPath) && IsFile(BlobLogPath) && IsFile(MetaLogPath)) { IsNew = false; } if (!IsNew) { RwLock::ExclusiveLockScope Lock(m_Lock); CbValidateError ValidateResult = CbValidateError::None; if (CbObject ManifestReader = ValidateAndReadCompactBinaryObject(ReadFile(ManifestPath).Flatten(), ValidateResult); ValidateResult == CbValidateError::None && ManifestReader) { Oid ManifestId = ManifestReader["id"].AsObjectId(); uint32_t Version = ManifestReader["version"].AsUInt32(); DateTime CreationDate = ManifestReader["createdAt"].AsDateTime(); ZEN_UNUSED(CreationDate); if (ManifestId == Oid::Zero || Version != blobstore::impl::ManifestVersion) { ZEN_WARN("Invalid manifest at {}, wiping state", ManifestPath); IsNew = true; } else { m_BlobLogFlushPosition = ReadPayloadLog(Lock, BlobLogPath, 0); m_MetaLogFlushPosition = ReadMetadataLog(Lock, MetaLogPath, 0); if (IsFile(AccessTimesPath)) { ReadAccessTimes(Lock, AccessTimesPath); } } } else { ZEN_WARN("Invalid manifest at {} ('{}'), wiping state", ManifestPath, ToString(ValidateResult)); IsNew = true; } } if (IsNew) { CleanDirectory(Config.RootDirectory, /*ForceRemoveReadOnlyFiles*/ false); CbObjectWriter ManifestWriter; ManifestWriter.AddObjectId("id", Oid::NewOid()); ManifestWriter.AddInteger("version", blobstore::impl::ManifestVersion); ManifestWriter.AddDateTime("createdAt", DateTime::Now()); TemporaryFile::SafeWriteFile(ManifestPath, ManifestWriter.Save().GetBuffer().AsIoBuffer()); } m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); m_Gc.AddGcReferencer(*this); m_Gc.AddGcReferenceLocker(*this); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what()); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); } } BuildStore::~BuildStore() { try { ZEN_TRACE_CPU("BuildStore::~BuildStore"); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); Flush(); m_MetadatalogFile.Close(); m_PayloadlogFile.Close(); } catch (const std::exception& Ex) { ZEN_ERROR("~BuildStore() threw exception: {}", Ex.what()); } } void BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) { ZEN_TRACE_CPU("BuildStore::PutBlob"); ZEN_MEMSCOPE(GetBuildstoreTag()); ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCompressedBinary); { RwLock::SharedLockScope _(m_Lock); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex BlobIndex = It->second; if (m_BlobEntries[BlobIndex].Payload) { return; } } } uint64_t PayloadSize = Payload.GetSize(); CidStore::InsertResult Result = m_BlobStore.AddChunk(Payload, BlobHash); PayloadEntry Entry = PayloadEntry(0, PayloadSize); ZEN_UNUSED(Result); IoHash MetadataHash; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; if (Blob.Payload) { m_PayloadEntries[Blob.Payload] = Entry; } else { Blob.Payload = PayloadIndex(gsl::narrow(m_PayloadEntries.size())); m_PayloadEntries.push_back(Entry); } if (Blob.Metadata) { MetadataHash = m_MetadataEntries[Blob.Metadata].MetadataHash; } Blob.LastAccessTime = GcClock::TickCount(); } else { PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow(m_PayloadEntries.size())); m_PayloadEntries.push_back(Entry); const BlobIndex NewBlobIndex(gsl::narrow(m_BlobEntries.size())); // we only remove during GC and compact this then... m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } if (m_TrackedBlobKeys) { m_TrackedBlobKeys->push_back(BlobHash); if (MetadataHash != IoHash::Zero) { m_TrackedBlobKeys->push_back(MetadataHash); } } } m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); m_LastAccessTimeUpdateCount++; } IoBuffer BuildStore::GetBlob(const IoHash& BlobHash) { ZEN_TRACE_CPU("BuildStore::GetBlob"); ZEN_MEMSCOPE(GetBuildstoreTag()); RwLock::SharedLockScope Lock(m_Lock); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; Blob.LastAccessTime = GcClock::TickCount(); if (Blob.Payload) { Lock.ReleaseNow(); IoBuffer Chunk = m_BlobStore.FindChunkByCid(BlobHash); if (Chunk) { Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } else { ZEN_WARN("Inconsistencies in build store, {} is in index but not in blob store", BlobHash); } } } return {}; } std::vector BuildStore::BlobsExists(std::span BlobHashes) { ZEN_TRACE_CPU("BuildStore::BlobsExists"); ZEN_MEMSCOPE(GetBuildstoreTag()); std::vector Result; Result.reserve(BlobHashes.size()); RwLock::SharedLockScope _(m_Lock); for (const IoHash& BlobHash : BlobHashes) { if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; const BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; bool HasPayload = !!Blob.Payload; bool HasMetadata = !!Blob.Metadata; Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata}); } else { Result.push_back({}); } } return Result; } void BuildStore::PutMetadatas(std::span BlobHashes, std::span Metadatas, WorkerThreadPool* OptionalWorkerPool) { ZEN_TRACE_CPU("BuildStore::PutMetadatas"); ZEN_MEMSCOPE(GetBuildstoreTag()); std::vector MetadataHashes; std::vector CompressedMetadataBuffers; auto CompressOne = [&BlobHashes, &MetadataHashes, &CompressedMetadataBuffers](const IoBuffer& Buffer, size_t Index) { if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; if (!CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) { throw std::runtime_error( fmt::format("Invalid compressed buffer provided when storing metadata for blob {}", BlobHashes[Index])); } else { CompressedMetadataBuffers[Index] = Buffer; MetadataHashes[Index] = RawHash; } } else { CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::None); MetadataHashes[Index] = Compressed.DecodeRawHash(); CompressedMetadataBuffers[Index] = std::move(Compressed.GetCompressed()).Flatten().AsIoBuffer(); CompressedMetadataBuffers[Index].SetContentType(ZenContentType::kCompressedBinary); } }; MetadataHashes.resize(Metadatas.size()); CompressedMetadataBuffers.resize(Metadatas.size()); if (OptionalWorkerPool) { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); for (size_t Index = 0; Index < Metadatas.size(); Index++) { Work.ScheduleWork( *OptionalWorkerPool, [Index, &BlobHashes, &Metadatas, &MetadataHashes, &CompressedMetadataBuffers, &CompressOne](std::atomic&) { const IoBuffer& Buffer = Metadatas[Index]; CompressOne(Buffer, Index); }, {}); } Work.Wait(); } else { for (size_t Index = 0; Index < Metadatas.size(); Index++) { const IoBuffer& Buffer = Metadatas[Index]; CompressOne(Buffer, Index); } } std::vector AddedMetadataEntries; AddedMetadataEntries.reserve(MetadataHashes.size()); std::vector InsertResults = m_BlobStore.AddChunks(CompressedMetadataBuffers, MetadataHashes); ZEN_UNUSED(InsertResults); { RwLock::ExclusiveLockScope _(m_Lock); for (size_t Index = 0; Index < BlobHashes.size(); Index++) { const ZenContentType ContentType = Metadatas[Index].GetContentType(); const IoHash& BlobHash = BlobHashes[Index]; const IoHash& MetadataHash = MetadataHashes[Index]; const IoBuffer& Metadata = CompressedMetadataBuffers[Index]; MetadataEntry Entry(MetadataHash, Metadata.GetSize(), ContentType, 0); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; if (Blob.Metadata) { m_MetadataEntries[Blob.Metadata] = Entry; } else { Blob.Metadata = MetadataIndex(gsl::narrow(m_MetadataEntries.size())); m_MetadataEntries.push_back(Entry); } Blob.LastAccessTime = GcClock::TickCount(); } else { MetadataIndex NewMetadataIndex = MetadataIndex(gsl::narrow(m_MetadataEntries.size())); m_MetadataEntries.push_back(Entry); const BlobIndex NewBlobIndex(gsl::narrow(m_BlobEntries.size())); m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } m_LastAccessTimeUpdateCount++; if (m_TrackedBlobKeys) { m_TrackedBlobKeys->push_back(BlobHash); m_TrackedBlobKeys->push_back(MetadataHash); } AddedMetadataEntries.push_back(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); } } m_MetadatalogFile.Append(AddedMetadataEntries); } std::vector BuildStore::GetMetadatas(std::span BlobHashes, WorkerThreadPool* OptionalWorkerPool) { ZEN_TRACE_CPU("BuildStore::GetMetadatas"); ZEN_MEMSCOPE(GetBuildstoreTag()); std::vector MetadataHashes; std::vector MetaLocationResultIndexes; MetadataHashes.reserve(BlobHashes.size()); MetaLocationResultIndexes.reserve(BlobHashes.size()); tsl::robin_set ReferencedBlocks; std::vector Result; std::vector ResultContentTypes; Result.resize(BlobHashes.size()); ResultContentTypes.resize(BlobHashes.size(), ZenContentType::kUnknownContentType); { RwLock::SharedLockScope _(m_Lock); for (size_t Index = 0; Index < BlobHashes.size(); Index++) { const IoHash& BlobHash = BlobHashes[Index]; if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& ExistingBlobEntry = m_BlobEntries[ExistingBlobIndex]; if (ExistingBlobEntry.Metadata) { const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata]; MetadataHashes.push_back(ExistingMetadataEntry.MetadataHash); MetaLocationResultIndexes.push_back(Index); ResultContentTypes[Index] = ExistingMetadataEntry.GetContentType(); } ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::TickCount()); m_LastAccessTimeUpdateCount++; } } } m_BlobStore.IterateChunks( MetadataHashes, [this, &BlobHashes, &MetadataHashes, &MetaLocationResultIndexes, &Result](size_t Index, const IoBuffer& Payload) { if (Payload) { size_t ResultIndex = MetaLocationResultIndexes[Index]; IoHash RawHash; uint64_t RawSize; CompressedBuffer CompressedBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); if (CompressedBuffer) { IoBuffer Decompressed = CompressedBuffer.DecompressToComposite().Flatten().AsIoBuffer(); if (Decompressed) { Result[ResultIndex] = std::move(Decompressed); } else { ZEN_WARN("Metadata {} for blob {} is malformed (not a compressed binary format)", MetadataHashes[ResultIndex], BlobHashes[ResultIndex]); } } } return true; }, OptionalWorkerPool, 8u * 1024u); for (size_t Index = 0; Index < Result.size(); Index++) { if (Result[Index]) { Result[Index].SetContentType(ResultContentTypes[Index]); } } return Result; } void BuildStore::Flush() { ZEN_TRACE_CPU("BuildStore::Flush"); try { Stopwatch Timer; const auto _ = MakeGuard( [&] { ZEN_INFO("Flushed build store at {} in {}", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_BlobStore.Flush(); m_PayloadlogFile.Flush(); m_MetadatalogFile.Flush(); if (uint64_t LastAccessTimeUpdateCount = m_LastAccessTimeUpdateCount.load(); LastAccessTimeUpdateCount > 0) { m_LastAccessTimeUpdateCount -= LastAccessTimeUpdateCount; RwLock::ExclusiveLockScope UpdateLock(m_Lock); WriteAccessTimes(UpdateLock, blobstore::impl::GetAccessTimesPath(m_Config.RootDirectory)); } } catch (const std::exception& Ex) { ZEN_ERROR("BuildStore::Flush failed. Reason: {}", Ex.what()); } } BuildStore::StorageStats BuildStore::GetStorageStats() const { StorageStats Result; { RwLock::SharedLockScope _(m_Lock); Result.EntryCount = m_BlobLookup.size(); for (auto LookupIt : m_BlobLookup) { const BlobIndex ReadBlobIndex = LookupIt.second; const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; if (ReadBlobEntry.Payload) { const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload]; uint64_t Size = Payload.GetSize(); Result.BlobCount++; Result.BlobBytes += Size; } if (ReadBlobEntry.Metadata) { const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata]; Result.MetadataCount++; Result.MetadataByteCount += Metadata.GetSize(); } } Result.BlobLogByteCount = m_PayloadlogFile.GetLogSize(); Result.MetadataLogByteCount = m_MetadatalogFile.GetLogSize(); } return Result; } #if ZEN_WITH_TESTS std::optional BuildStore::GetLastAccessTime(const IoHash& Key) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end()) { const BlobIndex Index = It->second; const BlobEntry& Entry = m_BlobEntries[Index]; return Entry.LastAccessTime; } return {}; } bool BuildStore::SetLastAccessTime(const IoHash& Key, const AccessTime& Time) { RwLock::SharedLockScope _(m_Lock); if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end()) { const BlobIndex Index = It->second; BlobEntry& Entry = m_BlobEntries[Index]; Entry.LastAccessTime = Time; return true; } return false; } #endif // ZEN_WITH_TESTS void BuildStore::CompactState() { ZEN_TRACE_CPU("BuildStore::CompactState"); std::vector BlobEntries; std::vector PayloadEntries; std::vector MetadataEntries; tsl::robin_map BlobLookup; RwLock::ExclusiveLockScope _(m_Lock); const size_t EntryCount = m_BlobLookup.size(); BlobLookup.reserve(EntryCount); const size_t PayloadCount = m_PayloadEntries.size(); PayloadEntries.reserve(PayloadCount); const size_t MetadataCount = m_MetadataEntries.size(); MetadataEntries.reserve(MetadataCount); for (auto LookupIt : m_BlobLookup) { const IoHash& BlobHash = LookupIt.first; const BlobIndex ReadBlobIndex = LookupIt.second; const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; const BlobIndex WriteBlobIndex(gsl::narrow(BlobEntries.size())); BlobEntries.push_back(ReadBlobEntry); BlobEntry& WriteBlobEntry = BlobEntries.back(); if (WriteBlobEntry.Payload) { const PayloadEntry& ReadPayloadEntry = m_PayloadEntries[ReadBlobEntry.Payload]; WriteBlobEntry.Payload = PayloadIndex(gsl::narrow(PayloadEntries.size())); PayloadEntries.push_back(ReadPayloadEntry); } if (ReadBlobEntry.Metadata) { const MetadataEntry& ReadMetadataEntry = m_MetadataEntries[ReadBlobEntry.Metadata]; WriteBlobEntry.Metadata = MetadataIndex(gsl::narrow(MetadataEntries.size())); MetadataEntries.push_back(ReadMetadataEntry); } BlobLookup.insert({BlobHash, WriteBlobIndex}); } m_BlobEntries.swap(BlobEntries); m_PayloadEntries.swap(PayloadEntries); m_MetadataEntries.swap(MetadataEntries); m_BlobLookup.swap(BlobLookup); } uint64_t BuildStore::ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("BuildStore::ReadPayloadLog"); if (!IsFile(LogPath)) { return 0; } uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read build store '{}' payload log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; if (!CasLog.IsValid(LogPath)) { RemoveFile(LogPath); return 0; } CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (!CasLog.Initialize()) { return 0; } const uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full payload log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const PayloadDiskEntry& Record) { std::string InvalidEntryReason; if (Record.Entry.GetFlags() & PayloadEntry::kTombStone) { // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end()) { if (!m_BlobEntries[ExistingIt->second].Metadata) { m_BlobLookup.erase(ExistingIt); } else { m_BlobEntries[ExistingIt->second].Payload = {}; } } return; } if (!ValidatePayloadDiskEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid payload entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; if (ExistingBlob.Payload) { const PayloadIndex ExistingPayloadIndex = ExistingBlob.Payload; m_PayloadEntries[ExistingPayloadIndex] = Record.Entry; } else { const PayloadIndex NewPayloadIndex(gsl::narrow(m_PayloadEntries.size())); m_PayloadEntries.push_back(Record.Entry); ExistingBlob.Payload = NewPayloadIndex; } } else { const PayloadIndex NewPayloadIndex(gsl::narrow(m_PayloadEntries.size())); m_PayloadEntries.push_back(Record.Entry); const BlobIndex NewBlobIndex(gsl::narrow(m_BlobEntries.size())); m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); } }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found {} invalid payload entries in '{}'", InvalidEntryCount, LogPath); } return LogEntryCount; } uint64_t BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("BuildStore::ReadMetadataLog"); if (!IsFile(LogPath)) { return 0; } uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read build store '{}' metadata log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; if (!CasLog.IsValid(LogPath)) { RemoveFile(LogPath); return 0; } CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (!CasLog.Initialize()) { return 0; } const uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full metadata log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const MetadataDiskEntry& Record) { std::string InvalidEntryReason; if (Record.Entry.GetFlags() & MetadataEntry::kTombStone) { // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end()) { if (!m_BlobEntries[ExistingIt->second].Payload) { m_BlobLookup.erase(ExistingIt); } else { m_BlobEntries[ExistingIt->second].Metadata = {}; } } return; } if (!ValidateMetadataDiskEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid metadata entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; if (ExistingBlob.Metadata) { const MetadataIndex ExistingMetadataIndex = ExistingBlob.Metadata; m_MetadataEntries[ExistingMetadataIndex] = Record.Entry; } else { const MetadataIndex NewMetadataIndex(gsl::narrow(m_MetadataEntries.size())); m_MetadataEntries.push_back(Record.Entry); ExistingBlob.Metadata = NewMetadataIndex; } } else { const MetadataIndex NewMetadataIndex(gsl::narrow(m_MetadataEntries.size())); m_MetadataEntries.push_back(Record.Entry); const BlobIndex NewBlobIndex(gsl::narrow(m_BlobEntries.size())); m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); } }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found {} invalid metadata entries in '{}'", InvalidEntryCount, LogPath); } return LogEntryCount; } void BuildStore::ReadAccessTimes(const RwLock::ExclusiveLockScope&, const std::filesystem::path& AccessTimesPath) { ZEN_TRACE_CPU("BuildStore::ReadAccessTimes"); using namespace blobstore::impl; BasicFile AccessTimesFile; AccessTimesFile.Open(AccessTimesPath, BasicFile::Mode::kRead); uint64_t Size = AccessTimesFile.FileSize(); if (Size >= sizeof(AccessTimesHeader)) { AccessTimesHeader Header; uint64_t Offset = 0; AccessTimesFile.Read(&Header, sizeof(Header), 0); Offset += sizeof(AccessTimesHeader); Offset = RoundUp(Offset, AccessTimesHeader::DataAlignment); if ((Header.Magic == AccessTimesHeader::ExpectedMagic) && (Header.Version == AccessTimesHeader::CurrentVersion) && (Header.Checksum == AccessTimesHeader::ComputeChecksum(Header))) { uint64_t RecordsSize = sizeof(AccessTimeRecord) * Header.AccessTimeCount; if (AccessTimesFile.FileSize() >= Offset + RecordsSize) { std::vector AccessRecords(Header.AccessTimeCount); AccessTimesFile.Read(AccessRecords.data(), RecordsSize, Offset); for (const AccessTimeRecord& Record : AccessRecords) { const IoHash& Key = Record.Key; const uint32_t SecondsSinceEpoch = Record.SecondsSinceEpoch; if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end()) { const BlobIndex Index = It->second; BlobEntry& Entry = m_BlobEntries[Index]; Entry.LastAccessTime.SetSecondsSinceEpoch(SecondsSinceEpoch); } else { m_LastAccessTimeUpdateCount++; } } } else { m_LastAccessTimeUpdateCount++; } } else { m_LastAccessTimeUpdateCount++; } } else { m_LastAccessTimeUpdateCount++; } } void BuildStore::WriteAccessTimes(const RwLock::ExclusiveLockScope&, const std::filesystem::path& AccessTimesPath) { ZEN_TRACE_CPU("BuildStore::WriteAccessTimes"); using namespace blobstore::impl; uint32_t Count = gsl::narrow(m_BlobLookup.size()); AccessTimesHeader Header = {.AccessTimeCount = Count}; Header.Checksum = AccessTimesHeader::ComputeChecksum(Header); TemporaryFile TempFile; std::error_code Ec; if (TempFile.CreateTemporary(AccessTimesPath.parent_path(), Ec); Ec) { throw std::runtime_error(fmt::format("Failed to create temporary file {} to write access times. Reason ({}) {}", TempFile.GetPath(), Ec.value(), Ec.message())); } { uint64_t Offset = 0; TempFile.Write(&Header, sizeof(AccessTimesHeader), Offset); Offset += sizeof(AccessTimesHeader); Offset = RoundUp(Offset, AccessTimesHeader::DataAlignment); std::vector AccessRecords; AccessRecords.reserve(Header.AccessTimeCount); for (auto It : m_BlobLookup) { const IoHash& Key = It.first; const BlobIndex Index = It.second; const BlobEntry& Entry = m_BlobEntries[Index]; const uint32_t SecondsSinceEpoch = Entry.LastAccessTime.GetSecondsSinceEpoch(); AccessRecords.emplace_back(AccessTimeRecord{.Key = Key, .SecondsSinceEpoch = SecondsSinceEpoch}); } uint64_t RecordsSize = sizeof(AccessTimeRecord) * Header.AccessTimeCount; TempFile.Write(AccessRecords.data(), RecordsSize, Offset); Offset += sizeof(AccessTimesHeader) * Header.AccessTimeCount; } if (TempFile.MoveTemporaryIntoPlace(AccessTimesPath, Ec); Ec) { throw std::runtime_error(fmt::format("Failed to move temporary file {} to {} when write access times. Reason ({}) {}", TempFile.GetPath(), AccessTimesPath, Ec.value(), Ec.message())); } } bool BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason) { if (Entry.BlobHash == IoHash::Zero) { OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString()); return false; } if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.GetFlags(), Entry.BlobHash.ToHexString()); return false; } if (Entry.Entry.GetFlags() & PayloadEntry::kTombStone) { return true; } if (Entry.Entry.GetSize() == 0 || Entry.Entry.GetSize() == 0x00ffffffffffffffu) { OutReason = fmt::format("Invalid size field {} for meta entry {}", Entry.Entry.GetSize(), Entry.BlobHash.ToHexString()); return false; } return true; } bool BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason) { if (Entry.BlobHash == IoHash::Zero) { OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString()); return false; } if (Entry.Entry.GetSize() == 0) { OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.GetSize()); return false; } if (Entry.Entry.GetFlags() & MetadataEntry::kTombStone) { return true; } if (Entry.Entry.GetContentType() == ZenContentType::kCOUNT) { OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString()); return false; } return true; } class BuildStoreGcReferenceChecker : public GcReferenceChecker { public: BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {} ~BuildStoreGcReferenceChecker() { try { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys.reset(); }); } catch (const std::exception& Ex) { ZEN_ERROR("~BuildStoreGcReferenceChecker threw exception: '{}'", Ex.what()); } } virtual std::string GetGcName(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); } virtual void PreCache(GcCtx& Ctx) override { ZEN_TRACE_CPU("Builds::PreCache"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: builds [PRECACHE] '{}': found {} references in {}", m_Store.m_Config.RootDirectory, m_PrecachedReferences.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys = std::make_unique>(); }); { m_PrecachedReferences.reserve(m_Store.m_BlobLookup.size()); RwLock::SharedLockScope __(m_Store.m_Lock); for (const auto& It : m_Store.m_BlobLookup) { const BuildStore::BlobIndex ExistingBlobIndex = It.second; const BuildStore::BlobEntry& Entry = m_Store.m_BlobEntries[ExistingBlobIndex]; if (Entry.Payload) { m_PrecachedReferences.push_back(It.first); } if (Entry.Metadata) { const BuildStore::MetadataEntry& MetadataEntry = m_Store.m_MetadataEntries[Entry.Metadata]; m_PrecachedReferences.push_back(MetadataEntry.MetadataHash); } } } FilterReferences(Ctx, fmt::format("buildstore [PRECACHE] '{}'", m_Store.m_Config.RootDirectory), m_PrecachedReferences); } virtual void UpdateLockedState(GcCtx& Ctx) override { ZEN_TRACE_CPU("Builds::UpdateLockedState"); ZEN_MEMSCOPE(GetBuildstoreTag()); ZEN_ASSERT(m_Store.m_TrackedBlobKeys); m_AddedReferences = std::move(*m_Store.m_TrackedBlobKeys); FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", m_Store.m_Config.RootDirectory), m_AddedReferences); } virtual std::span GetUnusedReferences(GcCtx& Ctx, std::span IoCids) override { ZEN_UNUSED(Ctx); ZEN_TRACE_CPU("Builds::GetUnusedReferences"); ZEN_MEMSCOPE(GetBuildstoreTag()); auto Log = [&Ctx]() { return Ctx.Logger; }; size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: buildstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", "buildstore", UsedCount, InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::span UnusedReferences = KeepUnusedReferences(m_PrecachedReferences, IoCids); UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences); UsedCount = IoCids.size() - UnusedReferences.size(); return UnusedReferences; } private: BuildStore& m_Store; std::vector m_PrecachedReferences; std::vector m_AddedReferences; }; std::string BuildStore::GetGcName(GcCtx& Ctx) { ZEN_UNUSED(Ctx); ZEN_MEMSCOPE(GetBuildstoreTag()); return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string()); } GcStoreCompactor* BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { ZEN_TRACE_CPU("Builds::RemoveExpiredData"); ZEN_MEMSCOPE(GetBuildstoreTag()); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: buildstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", m_Config.RootDirectory, Stats.CheckedCount, Stats.FoundCount, Stats.DeletedCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } }); const GcClock::Tick ExpireTicks = Ctx.Settings.BuildStoreExpireTime.time_since_epoch().count(); std::vector ExpiredBlobs; tsl::robin_set SizeDroppedBlobs; { struct SizeInfo { const IoHash Key; uint32_t SecondsSinceEpoch = 0; uint64_t BlobSize = 0; }; bool DiskSizeExceeded = false; const uint64_t CurrentBlobsDiskSize = m_BlobStore.TotalSize().TotalSize; if ((m_Config.MaxDiskSpaceLimit > 0) && (CurrentBlobsDiskSize > m_Config.MaxDiskSpaceLimit)) { DiskSizeExceeded = true; } uint64_t ExpiredDataSize = 0; std::vector NonExpiredBlobSizeInfos; { RwLock::SharedLockScope __(m_Lock); if (DiskSizeExceeded) { NonExpiredBlobSizeInfos.reserve(m_BlobLookup.size()); } for (const auto& It : m_BlobLookup) { const BlobIndex ReadBlobIndex = It.second; const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; uint64_t Size = 0; if (ReadBlobEntry.Payload) { const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload]; Size += Payload.GetSize(); } if (ReadBlobEntry.Metadata) { const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata]; Size += Metadata.GetSize(); } const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; if (AccessTick < ExpireTicks) { ExpiredBlobs.push_back(It.first); ExpiredDataSize += Size; } else if (DiskSizeExceeded) { NonExpiredBlobSizeInfos.emplace_back(SizeInfo{.Key = It.first, .SecondsSinceEpoch = ReadBlobEntry.LastAccessTime.GetSecondsSinceEpoch(), .BlobSize = Size}); } } Stats.CheckedCount += m_BlobLookup.size(); Stats.FoundCount += ExpiredBlobs.size(); } if (DiskSizeExceeded) { const uint64_t NewSizeLimit = m_Config.MaxDiskSpaceLimit - (m_Config.MaxDiskSpaceLimit >> 4); // Remove a bit more than just below the limit so we have some space to grow if ((CurrentBlobsDiskSize - ExpiredDataSize) > NewSizeLimit) { std::vector NonExpiredOrder; NonExpiredOrder.resize(NonExpiredBlobSizeInfos.size()); for (size_t Index = 0; Index < NonExpiredOrder.size(); Index++) { NonExpiredOrder[Index] = Index; } std::sort(NonExpiredOrder.begin(), NonExpiredOrder.end(), [&NonExpiredBlobSizeInfos](const size_t Lhs, const size_t Rhs) { const SizeInfo& LhsInfo = NonExpiredBlobSizeInfos[Lhs]; const SizeInfo& RhsInfo = NonExpiredBlobSizeInfos[Rhs]; return LhsInfo.SecondsSinceEpoch < RhsInfo.SecondsSinceEpoch; }); auto It = NonExpiredOrder.begin(); while (It != NonExpiredOrder.end()) { const SizeInfo& Info = NonExpiredBlobSizeInfos[*It]; if ((CurrentBlobsDiskSize - ExpiredDataSize) < NewSizeLimit) { break; } ExpiredDataSize += Info.BlobSize; ExpiredBlobs.push_back(Info.Key); SizeDroppedBlobs.insert(Info.Key); It++; } } } } std::vector RemovedBlobs; if (!ExpiredBlobs.empty()) { if (Ctx.Settings.IsDeleteMode) { RemovedBlobs.reserve(ExpiredBlobs.size()); std::vector RemovedPayloads; std::vector RemoveMetadatas; RwLock::ExclusiveLockScope __(m_Lock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } for (const IoHash& ExpiredBlob : ExpiredBlobs) { if (auto It = m_BlobLookup.find(ExpiredBlob); It != m_BlobLookup.end()) { const BlobIndex ReadBlobIndex = It->second; const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; if (SizeDroppedBlobs.contains(ExpiredBlob) || (AccessTick < ExpireTicks)) { if (ReadBlobEntry.Payload) { RemovedPayloads.push_back( PayloadDiskEntry{.Entry = m_PayloadEntries[ReadBlobEntry.Payload], .BlobHash = ExpiredBlob}); RemovedPayloads.back().Entry.AddFlag(PayloadEntry::kTombStone); m_PayloadEntries[ReadBlobEntry.Payload] = {}; m_BlobEntries[ReadBlobIndex].Payload = {}; } if (ReadBlobEntry.Metadata) { RemoveMetadatas.push_back( MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob}); RemoveMetadatas.back().Entry.AddFlag(MetadataEntry::kTombStone); m_MetadataEntries[ReadBlobEntry.Metadata] = {}; m_BlobEntries[ReadBlobIndex].Metadata = {}; } m_BlobLookup.erase(It); m_LastAccessTimeUpdateCount++; RemovedBlobs.push_back(ExpiredBlob); Stats.DeletedCount++; } } } if (!RemovedPayloads.empty()) { m_PayloadlogFile.Append(RemovedPayloads); } if (!RemoveMetadatas.empty()) { m_MetadatalogFile.Append(RemoveMetadatas); } } } if (!RemovedBlobs.empty()) { CompactState(); } return nullptr; } std::vector BuildStore::CreateReferenceCheckers(GcCtx& Ctx) { ZEN_UNUSED(Ctx); ZEN_MEMSCOPE(GetBuildstoreTag()); return {new BuildStoreGcReferenceChecker(*this)}; } std::vector BuildStore::CreateReferenceValidators(GcCtx& Ctx) { ZEN_UNUSED(Ctx); return {}; } std::vector BuildStore::LockState(GcCtx& Ctx) { ZEN_UNUSED(Ctx); std::vector Locks; Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); return Locks; } /* ___________ __ \__ ___/___ _______/ |_ ______ | |_/ __ \ / ___/\ __\/ ___/ | |\ ___/ \___ \ | | \___ \ |____| \___ >____ > |__| /____ > \/ \/ \/ */ #if ZEN_WITH_TESTS TEST_CASE("BuildStore.Blobs") { ScopedTemporaryDirectory _; BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; std::vector CompressedBlobsHashes; { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); CHECK(Payload); CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); IoHash VerifyRawHash; uint64_t VerifyRawSize; CompressedBuffer CompressedBlob = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); CHECK(CompressedBlob); CHECK(VerifyRawHash == RawHash); IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } BlobStore.Flush(); } { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); CHECK(Payload); CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); IoHash VerifyRawHash; uint64_t VerifyRawSize; CompressedBuffer CompressedBlob = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); CHECK(CompressedBlob); CHECK(VerifyRawHash == RawHash); IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } } { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); CHECK(Payload); CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); IoHash VerifyRawHash; uint64_t VerifyRawSize; CompressedBuffer CompressedBlob = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); CHECK(CompressedBlob); CHECK(VerifyRawHash == RawHash); IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } } } namespace blockstore::testing { IoBuffer MakeMetadata(const IoHash& BlobHash, const std::vector>& KeyValues) { CbObjectWriter Writer; Writer.AddHash("rawHash"sv, BlobHash); Writer.BeginObject("values"); { for (const auto& V : KeyValues) { Writer.AddString(V.first, V.second); } } Writer.EndObject(); // values return Writer.Save().GetBuffer().AsIoBuffer(); }; } // namespace blockstore::testing TEST_CASE("BuildStore.Metadata") { using namespace blockstore::testing; ScopedTemporaryDirectory _; WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; std::vector BlobHashes; std::vector MetaPayloads; { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); MetaPayloads.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); MetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(BlobHashes, MetaPayloads, &WorkerPool); std::vector ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); } } { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); std::vector ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); } for (const IoHash& BlobHash : BlobHashes) { CHECK(!Store.GetBlob(BlobHash)); } } std::vector CompressedBlobsHashes; { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); for (const auto& MetadataIt : MetadataPayloads) { CHECK(!MetadataIt); } for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(Blob); IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); } } std::vector BlobMetaPayloads; { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool); std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK_EQ(IoHash::HashBuffer(MetadataPayload), IoHash::HashBuffer(BlobMetaPayloads[I])); } } { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); } for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(Blob); IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); } BlobMetaPayloads.clear(); for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back( MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool); } { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); } for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(Blob); IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); } } } TEST_CASE("BuildStore.GC") { using namespace blockstore::testing; ScopedTemporaryDirectory _; BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; std::vector CompressedBlobsHashes; std::vector BlobMetaPayloads; { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr); } { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), .CollectSmallObjects = false, .IsDeleteMode = false, .Verbose = true}); CHECK(!Result.WasCancelled); for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(Blob); IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); } std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); } } { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() + std::chrono::hours(1), .CollectSmallObjects = true, .IsDeleteMode = true, .Verbose = true}); CHECK(!Result.WasCancelled); for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(!Blob); } std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK(!MetadataPayload); } } } } TEST_CASE("BuildStore.SizeLimit") { using namespace blockstore::testing; ScopedTemporaryDirectory _; BuildStoreConfig Config = {.MaxDiskSpaceLimit = 1024u * 1024u}; Config.RootDirectory = _.Path() / "build_store"; std::vector CompressedBlobsHashes; std::vector BlobMetaPayloads; { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 64; I++) { IoBuffer Blob = CreateSemiRandomBlob(65537 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)), OodleCompressor::Mermaid, OodleCompressionLevel::None); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr); { for (size_t I = 0; I < 64; I++) { const IoHash& Key = CompressedBlobsHashes[I]; GcClock::Tick AccessTick = (GcClock::Now() + std::chrono::minutes(I)).time_since_epoch().count(); Store.SetLastAccessTime(Key, AccessTime(AccessTick)); } } } { GcManager Gc; CidStore BlobStore(Gc); BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); BuildStore Store(Config, Gc, BlobStore); { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), .CollectSmallObjects = true, .IsDeleteMode = true, .Verbose = true}); uint32_t DeletedBlobs = 0; CHECK(!Result.WasCancelled); for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); if (!Blob) { DeletedBlobs++; } else { IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); } } CHECK(DeletedBlobs == 53); std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; if (I < DeletedBlobs) { CHECK(!MetadataPayload); } else { CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); } } } } } void buildstore_forcelink() { } #endif } // namespace zen