// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include # include #endif // ZEN_WITH_TESTS namespace zen { const FLLMTag& GetBuildstoreTag() { static FLLMTag _("store", FLLMTag("builds")); return _; } using namespace std::literals; namespace blobstore::impl { const std::string BaseName = "builds"; const std::string ManifestExtension = ".cbo"; const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; const char* AccessTimeExtension = ".zacs"; const uint32_t ManifestVersion = (1 << 16) | (0 << 8) | (0); std::filesystem::path GetManifestPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + ManifestExtension); } std::filesystem::path GetBlobIndexPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + IndexExtension); } std::filesystem::path GetBlobLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + LogExtension); } std::filesystem::path GetMetaIndexPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + "_meta" + IndexExtension); } std::filesystem::path GetMetaLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + "_meta" + LogExtension); } std::filesystem::path GetAccessTimesPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + AccessTimeExtension); } struct AccessTimeRecord { IoHash Key; std::uint32_t SecondsSinceEpoch = 0; }; static_assert(sizeof(AccessTimeRecord) == 24); #pragma pack(push) #pragma pack(1) struct AccessTimesHeader { static constexpr uint32_t ExpectedMagic = 0x7363617a; // 'zacs'; static constexpr uint32_t CurrentVersion = 1; static constexpr uint64_t DataAlignment = 8; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint32_t AccessTimeCount = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const AccessTimesHeader& Header) { return XXH32(&Header.Magic, sizeof(AccessTimesHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; #pragma pack(pop) static_assert(sizeof(AccessTimesHeader) == 16); } // namespace blobstore::impl BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) : m_Log(logging::Get("builds")) , m_Config(Config) , m_Gc(Gc) , m_LargeBlobStore(m_Gc) , m_SmallBlobStore(Gc) , m_MetadataBlockStore() { ZEN_TRACE_CPU("BuildStore::BuildStore"); ZEN_MEMSCOPE(GetBuildstoreTag()); try { bool IsNew = true; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("{} build store at {} in {}", IsNew ? "Initialized" : "Read", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::filesystem::path BlobLogPath = blobstore::impl::GetBlobLogPath(Config.RootDirectory); std::filesystem::path MetaLogPath = blobstore::impl::GetMetaLogPath(Config.RootDirectory); std::filesystem::path ManifestPath = blobstore::impl::GetManifestPath(Config.RootDirectory); std::filesystem::path AccessTimesPath = blobstore::impl::GetAccessTimesPath(Config.RootDirectory); if (IsFile(ManifestPath) && IsFile(BlobLogPath) && IsFile(MetaLogPath)) { IsNew = false; } if (!IsNew) { RwLock::ExclusiveLockScope Lock(m_Lock); CbObject ManifestReader = LoadCompactBinaryObject(ReadFile(ManifestPath).Flatten()); Oid ManifestId = ManifestReader["id"].AsObjectId(); uint32_t Version = ManifestReader["version"].AsUInt32(); DateTime CreationDate = ManifestReader["createdAt"].AsDateTime(); ZEN_UNUSED(CreationDate); if (ManifestId == Oid::Zero || Version != blobstore::impl::ManifestVersion) { ZEN_WARN("Invalid manifest at {}, wiping state", ManifestPath); IsNew = true; } else { m_BlobLogFlushPosition = ReadPayloadLog(Lock, BlobLogPath, 0); m_MetaLogFlushPosition = ReadMetadataLog(Lock, MetaLogPath, 0); if (IsFile(AccessTimesPath)) { ReadAccessTimes(Lock, AccessTimesPath); } } } if (IsNew) { CleanDirectory(Config.RootDirectory, false); CbObjectWriter ManifestWriter; ManifestWriter.AddObjectId("id", Oid::NewOid()); ManifestWriter.AddInteger("version", blobstore::impl::ManifestVersion); ManifestWriter.AddDateTime("createdAt", DateTime::Now()); TemporaryFile::SafeWriteFile(ManifestPath, ManifestWriter.Save().GetBuffer().AsIoBuffer()); } m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew); m_SmallBlobStore.Initialize(Config.RootDirectory, "blob_cas", m_Config.SmallBlobBlockStoreMaxBlockSize, m_Config.SmallBlobBlockStoreAlignement, IsNew); m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20); { BlockStore::BlockIndexSet KnownBlocks; for (const BlobEntry& Blob : m_BlobEntries) { if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) { const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; KnownBlocks.insert(Metadata.Location.BlockIndex); } } m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); } m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); m_Gc.AddGcReferencer(*this); m_Gc.AddGcReferenceLocker(*this); m_Gc.AddGcStorage(this); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what()); m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); } } BuildStore::~BuildStore() { try { ZEN_TRACE_CPU("BuildStore::~BuildStore"); m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); Flush(); m_MetadatalogFile.Close(); m_PayloadlogFile.Close(); } catch (const std::exception& Ex) { ZEN_ERROR("~BuildStore() threw exception: {}", Ex.what()); } } void BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) { ZEN_TRACE_CPU("BuildStore::PutBlob"); ZEN_MEMSCOPE(GetBuildstoreTag()); ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCompressedBinary); { RwLock::SharedLockScope _(m_Lock); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex BlobIndex = It->second; if (m_BlobEntries[BlobIndex].Payload) { return; } } } uint64_t PayloadSize = Payload.GetSize(); PayloadEntry Entry; if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize) { CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash); ZEN_UNUSED(Result); Entry = PayloadEntry(PayloadEntry::kStandalone, PayloadSize); } else { CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash); ZEN_UNUSED(Result); Entry = PayloadEntry(0, PayloadSize); } m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; if (Blob.Payload) { m_PayloadEntries[Blob.Payload] = Entry; } else { Blob.Payload = PayloadIndex(gsl::narrow(m_PayloadEntries.size())); m_PayloadEntries.push_back(Entry); } Blob.LastAccessTime = GcClock::TickCount(); } else { PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow(m_PayloadEntries.size())); m_PayloadEntries.push_back(Entry); const BlobIndex NewBlobIndex(gsl::narrow(m_BlobEntries.size())); // we only remove during GC and compact this then... m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } m_LastAccessTimeUpdateCount++; } IoBuffer BuildStore::GetBlob(const IoHash& BlobHash) { ZEN_TRACE_CPU("BuildStore::GetBlob"); ZEN_MEMSCOPE(GetBuildstoreTag()); RwLock::SharedLockScope Lock(m_Lock); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; Blob.LastAccessTime = GcClock::TickCount(); if (Blob.Payload) { const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload]; const bool IsStandalone = (Entry.GetFlags() & PayloadEntry::kStandalone) != 0; Lock.ReleaseNow(); IoBuffer Chunk; if (IsStandalone) { ZEN_TRACE_CPU("GetLarge"); Chunk = m_LargeBlobStore.FindChunk(BlobHash); } else { ZEN_TRACE_CPU("GetSmall"); Chunk = m_SmallBlobStore.FindChunk(BlobHash); } if (Chunk) { Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } else { ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block"); } } } return {}; } std::vector BuildStore::BlobsExists(std::span BlobHashes) { ZEN_TRACE_CPU("BuildStore::BlobsExists"); ZEN_MEMSCOPE(GetBuildstoreTag()); std::vector Result; Result.reserve(BlobHashes.size()); RwLock::SharedLockScope _(m_Lock); for (const IoHash& BlobHash : BlobHashes) { if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; bool HasPayload = !!Blob.Payload; bool HasMetadata = !!Blob.Metadata; Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata}); } else { Result.push_back({}); } } return Result; } void BuildStore::PutMetadatas(std::span BlobHashes, std::span MetaDatas) { ZEN_TRACE_CPU("BuildStore::PutMetadatas"); ZEN_MEMSCOPE(GetBuildstoreTag()); size_t WriteBlobIndex = 0; m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span Locations) { RwLock::ExclusiveLockScope _(m_Lock); for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++) { const IoBuffer& Data = MetaDatas[WriteBlobIndex]; const IoHash& BlobHash = BlobHashes[WriteBlobIndex]; const BlockStoreLocation& Location = Locations[LocationIndex]; MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0}; m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; if (Blob.Metadata) { m_MetadataEntries[Blob.Metadata] = Entry; } else { Blob.Metadata = MetadataIndex(gsl::narrow(m_MetadataEntries.size())); m_MetadataEntries.push_back(Entry); } Blob.LastAccessTime = GcClock::TickCount(); } else { MetadataIndex NewMetadataIndex = MetadataIndex(gsl::narrow(m_MetadataEntries.size())); m_MetadataEntries.push_back(Entry); const BlobIndex NewBlobIndex(gsl::narrow(m_BlobEntries.size())); m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } m_LastAccessTimeUpdateCount++; WriteBlobIndex++; if (m_TrackedCacheKeys) { m_TrackedCacheKeys->insert(BlobHash); } } }); } std::vector BuildStore::GetMetadatas(std::span BlobHashes, WorkerThreadPool* OptionalWorkerPool) { ZEN_TRACE_CPU("BuildStore::GetMetadatas"); ZEN_MEMSCOPE(GetBuildstoreTag()); std::vector MetaLocations; std::vector MetaLocationResultIndexes; MetaLocations.reserve(BlobHashes.size()); MetaLocationResultIndexes.reserve(BlobHashes.size()); tsl::robin_set ReferencedBlocks; std::vector Result; std::vector ResultContentTypes; Result.resize(BlobHashes.size()); ResultContentTypes.resize(BlobHashes.size(), ZenContentType::kUnknownContentType); { RwLock::SharedLockScope _(m_Lock); for (size_t Index = 0; Index < BlobHashes.size(); Index++) { const IoHash& BlobHash = BlobHashes[Index]; if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& ExistingBlobEntry = m_BlobEntries[ExistingBlobIndex]; if (ExistingBlobEntry.Metadata) { const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata]; MetaLocations.push_back(ExistingMetadataEntry.Location); MetaLocationResultIndexes.push_back(Index); ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex); ResultContentTypes[Index] = ExistingMetadataEntry.ContentType; } ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::TickCount()); m_LastAccessTimeUpdateCount++; } } } auto DoOneBlock = [&](std::span ChunkIndexes) { if (ChunkIndexes.size() < 4) { for (size_t ChunkIndex : ChunkIndexes) { IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]); if (Chunk) { size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; Result[ResultIndex] = std::move(Chunk); } } return true; } return m_MetadataBlockStore.IterateBlock( MetaLocations, ChunkIndexes, [&](size_t ChunkIndex, const void* Data, uint64_t Size) { if (Data != nullptr) { size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size); } return true; }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; Result[ResultIndex] = File.GetChunk(Offset, Size); return true; }, 8u * 1024u); }; if (!MetaLocations.empty()) { Latch WorkLatch(1); m_MetadataBlockStore.IterateChunks(MetaLocations, [&](uint32_t BlockIndex, std::span ChunkIndexes) -> bool { ZEN_UNUSED(BlockIndex); if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1) { return DoOneBlock(ChunkIndexes); } else { ZEN_ASSERT(OptionalWorkerPool != nullptr); WorkLatch.AddCount(1); try { OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector(ChunkIndexes.begin(), ChunkIndexes.end())]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); try { DoOneBlock(ChunkIndexes); } catch (const std::exception& Ex) { ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); } }); } catch (const std::exception& Ex) { WorkLatch.CountDown(); ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); } return true; } }); WorkLatch.CountDown(); WorkLatch.Wait(); } for (size_t Index = 0; Index < Result.size(); Index++) { if (Result[Index]) { Result[Index].SetContentType(ResultContentTypes[Index]); } } return Result; } void BuildStore::Flush() { ZEN_TRACE_CPU("BuildStore::Flush"); try { Stopwatch Timer; const auto _ = MakeGuard( [&] { ZEN_INFO("Flushed build store at {} in {}", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_LargeBlobStore.Flush(); m_SmallBlobStore.Flush(); m_MetadataBlockStore.Flush(false); m_PayloadlogFile.Flush(); m_MetadatalogFile.Flush(); if (uint64_t LastAccessTimeUpdateCount = m_LastAccessTimeUpdateCount.load(); LastAccessTimeUpdateCount > 0) { m_LastAccessTimeUpdateCount -= LastAccessTimeUpdateCount; RwLock::ExclusiveLockScope UpdateLock(m_Lock); WriteAccessTimes(UpdateLock, blobstore::impl::GetAccessTimesPath(m_Config.RootDirectory)); } } catch (const std::exception& Ex) { ZEN_ERROR("BuildStore::Flush failed. Reason: {}", Ex.what()); } } BuildStore::StorageStats BuildStore::GetStorageStats() const { StorageStats Result; { RwLock::SharedLockScope _(m_Lock); Result.EntryCount = m_BlobLookup.size(); for (auto LookupIt : m_BlobLookup) { const BlobIndex ReadBlobIndex = LookupIt.second; const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; if (ReadBlobEntry.Payload) { const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload]; uint64_t Size = Payload.GetSize(); if ((Payload.GetFlags() & PayloadEntry::kStandalone) != 0) { Result.LargeBlobCount++; Result.LargeBlobBytes += Size; } else { Result.SmallBlobCount++; Result.SmallBlobBytes += Size; } } if (ReadBlobEntry.Metadata) { const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata]; Result.MetadataCount++; Result.MetadataByteCount += Metadata.Location.Size; } } } return Result; } #if ZEN_WITH_TESTS std::optional BuildStore::GetLastAccessTime(const IoHash& Key) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end()) { const BlobIndex Index = It->second; const BlobEntry& Entry = m_BlobEntries[Index]; return Entry.LastAccessTime; } return {}; } bool BuildStore::SetLastAccessTime(const IoHash& Key, const AccessTime& Time) { RwLock::SharedLockScope _(m_Lock); if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end()) { const BlobIndex Index = It->second; BlobEntry& Entry = m_BlobEntries[Index]; Entry.LastAccessTime = Time; return true; } return false; } #endif // ZEN_WITH_TESTS void BuildStore::CompactState() { ZEN_TRACE_CPU("BuildStore::CompactState"); std::vector BlobEntries; std::vector PayloadEntries; std::vector MetadataEntries; tsl::robin_map BlobLookup; RwLock::ExclusiveLockScope _(m_Lock); const size_t EntryCount = m_BlobLookup.size(); BlobLookup.reserve(EntryCount); const size_t PayloadCount = m_PayloadEntries.size(); PayloadEntries.reserve(PayloadCount); const size_t MetadataCount = m_MetadataEntries.size(); MetadataEntries.reserve(MetadataCount); for (auto LookupIt : m_BlobLookup) { const IoHash& BlobHash = LookupIt.first; const BlobIndex ReadBlobIndex = LookupIt.second; const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; const BlobIndex WriteBlobIndex(gsl::narrow(BlobEntries.size())); BlobEntries.push_back(ReadBlobEntry); BlobEntry& WriteBlobEntry = BlobEntries.back(); if (WriteBlobEntry.Payload) { const PayloadEntry& ReadPayloadEntry = m_PayloadEntries[ReadBlobEntry.Payload]; WriteBlobEntry.Payload = PayloadIndex(gsl::narrow(PayloadEntries.size())); PayloadEntries.push_back(ReadPayloadEntry); } if (ReadBlobEntry.Metadata) { const MetadataEntry& ReadMetadataEntry = m_MetadataEntries[ReadBlobEntry.Metadata]; WriteBlobEntry.Metadata = MetadataIndex(gsl::narrow(MetadataEntries.size())); MetadataEntries.push_back(ReadMetadataEntry); } BlobLookup.insert({BlobHash, WriteBlobIndex}); } m_BlobEntries.swap(BlobEntries); m_PayloadEntries.swap(PayloadEntries); m_MetadataEntries.swap(MetadataEntries); m_BlobLookup.swap(BlobLookup); } uint64_t BuildStore::ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("BuildStore::ReadPayloadLog"); if (!IsFile(LogPath)) { return 0; } uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read build store '{}' payload log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; if (!CasLog.IsValid(LogPath)) { RemoveFile(LogPath); return 0; } CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (!CasLog.Initialize()) { return 0; } const uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full payload log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const PayloadDiskEntry& Record) { std::string InvalidEntryReason; if (Record.Entry.GetFlags() & PayloadEntry::kTombStone) { // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end()) { if (!m_BlobEntries[ExistingIt->second].Metadata) { m_BlobLookup.erase(ExistingIt); } else { m_BlobEntries[ExistingIt->second].Payload = {}; } } return; } if (!ValidatePayloadDiskEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid payload entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; if (ExistingBlob.Payload) { const PayloadIndex ExistingPayloadIndex = ExistingBlob.Payload; m_PayloadEntries[ExistingPayloadIndex] = Record.Entry; } else { const PayloadIndex NewPayloadIndex(gsl::narrow(m_PayloadEntries.size())); m_PayloadEntries.push_back(Record.Entry); ExistingBlob.Payload = NewPayloadIndex; } } else { const PayloadIndex NewPayloadIndex(gsl::narrow(m_PayloadEntries.size())); m_PayloadEntries.push_back(Record.Entry); const BlobIndex NewBlobIndex(gsl::narrow(m_BlobEntries.size())); m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); } }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found {} invalid payload entries in '{}'", InvalidEntryCount, LogPath); } return LogEntryCount; } uint64_t BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("BuildStore::ReadMetadataLog"); if (!IsFile(LogPath)) { return 0; } uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read build store '{}' metadata log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; if (!CasLog.IsValid(LogPath)) { RemoveFile(LogPath); return 0; } CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (!CasLog.Initialize()) { return 0; } const uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full metadata log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const MetadataDiskEntry& Record) { std::string InvalidEntryReason; if (Record.Entry.Flags & MetadataEntry::kTombStone) { // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end()) { if (!m_BlobEntries[ExistingIt->second].Payload) { m_BlobLookup.erase(ExistingIt); } else { m_BlobEntries[ExistingIt->second].Metadata = {}; } } return; } if (!ValidateMetadataDiskEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid metadata entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) { const BlobIndex ExistingBlobIndex = It->second; BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; if (ExistingBlob.Metadata) { const MetadataIndex ExistingMetadataIndex = ExistingBlob.Metadata; m_MetadataEntries[ExistingMetadataIndex] = Record.Entry; } else { const MetadataIndex NewMetadataIndex(gsl::narrow(m_MetadataEntries.size())); m_MetadataEntries.push_back(Record.Entry); ExistingBlob.Metadata = NewMetadataIndex; } } else { const MetadataIndex NewMetadataIndex(gsl::narrow(m_MetadataEntries.size())); m_MetadataEntries.push_back(Record.Entry); const BlobIndex NewBlobIndex(gsl::narrow(m_BlobEntries.size())); m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); } }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found {} invalid metadata entries in '{}'", InvalidEntryCount, LogPath); } return LogEntryCount; } void BuildStore::ReadAccessTimes(const RwLock::ExclusiveLockScope&, const std::filesystem::path& AccessTimesPath) { ZEN_TRACE_CPU("BuildStore::ReadAccessTimes"); using namespace blobstore::impl; BasicFile AccessTimesFile; AccessTimesFile.Open(AccessTimesPath, BasicFile::Mode::kRead); uint64_t Size = AccessTimesFile.FileSize(); if (Size >= sizeof(AccessTimesHeader)) { AccessTimesHeader Header; uint64_t Offset = 0; AccessTimesFile.Read(&Header, sizeof(Header), 0); Offset += sizeof(AccessTimesHeader); Offset = RoundUp(Offset, AccessTimesHeader::DataAlignment); if ((Header.Magic == AccessTimesHeader::ExpectedMagic) && (Header.Version == AccessTimesHeader::CurrentVersion) && (Header.Checksum == AccessTimesHeader::ComputeChecksum(Header))) { uint64_t RecordsSize = sizeof(AccessTimeRecord) * Header.AccessTimeCount; if (AccessTimesFile.FileSize() >= Offset + RecordsSize) { std::vector AccessRecords(Header.AccessTimeCount); AccessTimesFile.Read(AccessRecords.data(), RecordsSize, Offset); for (const AccessTimeRecord& Record : AccessRecords) { const IoHash& Key = Record.Key; const uint32_t SecondsSinceEpoch = Record.SecondsSinceEpoch; if (auto It = m_BlobLookup.find(Key); It != m_BlobLookup.end()) { const BlobIndex Index = It->second; BlobEntry& Entry = m_BlobEntries[Index]; Entry.LastAccessTime.SetSecondsSinceEpoch(SecondsSinceEpoch); } else { m_LastAccessTimeUpdateCount++; } } } else { m_LastAccessTimeUpdateCount++; } } else { m_LastAccessTimeUpdateCount++; } } else { m_LastAccessTimeUpdateCount++; } } void BuildStore::WriteAccessTimes(const RwLock::ExclusiveLockScope&, const std::filesystem::path& AccessTimesPath) { ZEN_TRACE_CPU("BuildStore::WriteAccessTimes"); using namespace blobstore::impl; uint32_t Count = gsl::narrow(m_BlobLookup.size()); AccessTimesHeader Header = {.AccessTimeCount = Count}; Header.Checksum = AccessTimesHeader::ComputeChecksum(Header); TemporaryFile TempFile; std::error_code Ec; if (TempFile.CreateTemporary(AccessTimesPath.parent_path(), Ec); Ec) { throw std::runtime_error(fmt::format("Failed to create temporary file {} to write access times. Reason ({}) {}", TempFile.GetPath(), Ec.value(), Ec.message())); } { uint64_t Offset = 0; TempFile.Write(&Header, sizeof(AccessTimesHeader), Offset); Offset += sizeof(AccessTimesHeader); Offset = RoundUp(Offset, AccessTimesHeader::DataAlignment); std::vector AccessRecords; AccessRecords.reserve(Header.AccessTimeCount); for (auto It : m_BlobLookup) { const IoHash& Key = It.first; const BlobIndex Index = It.second; const BlobEntry& Entry = m_BlobEntries[Index]; const uint32_t SecondsSinceEpoch = Entry.LastAccessTime.GetSecondsSinceEpoch(); AccessRecords.emplace_back(AccessTimeRecord{.Key = Key, .SecondsSinceEpoch = SecondsSinceEpoch}); } uint64_t RecordsSize = sizeof(AccessTimeRecord) * Header.AccessTimeCount; TempFile.Write(AccessRecords.data(), RecordsSize, Offset); Offset += sizeof(AccessTimesHeader) * Header.AccessTimeCount; } if (TempFile.MoveTemporaryIntoPlace(AccessTimesPath, Ec); Ec) { throw std::runtime_error(fmt::format("Failed to move temporary file {} to {} when write access times. Reason ({}) {}", TempFile.GetPath(), AccessTimesPath, Ec.value(), Ec.message())); } } bool BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason) { if (Entry.BlobHash == IoHash::Zero) { OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString()); return false; } if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.GetFlags(), Entry.BlobHash.ToHexString()); return false; } if (Entry.Entry.GetFlags() & PayloadEntry::kTombStone) { return true; } if (Entry.Entry.GetSize() == 0 || Entry.Entry.GetSize() == 0x00ffffffffffffffu) { OutReason = fmt::format("Invalid size field {} for meta entry {}", Entry.Entry.GetSize(), Entry.BlobHash.ToHexString()); return false; } return true; } bool BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason) { if (Entry.BlobHash == IoHash::Zero) { OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString()); return false; } if (Entry.Entry.Location.Size == 0) { OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size); return false; } if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0) { OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); return false; } if (Entry.Entry.Flags & MetadataEntry::kTombStone) { return true; } if (Entry.Entry.ContentType == ZenContentType::kCOUNT) { OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString()); return false; } if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0) { OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); return false; } return true; } class BuildStoreGcReferenceChecker : public GcReferenceChecker { public: BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {} virtual std::string GetGcName(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); } virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } virtual void UpdateLockedState(GcCtx& Ctx) override { ZEN_TRACE_CPU("Builds::UpdateLockedState"); ZEN_MEMSCOPE(GetBuildstoreTag()); auto Log = [&Ctx]() { return Ctx.Logger; }; m_References.reserve(m_Store.m_BlobLookup.size()); for (const auto& It : m_Store.m_BlobLookup) { const BuildStore::BlobIndex ExistingBlobIndex = It.second; if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload) { m_References.push_back(It.first); } } FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References); } virtual std::span GetUnusedReferences(GcCtx& Ctx, std::span IoCids) override { ZEN_UNUSED(Ctx); ZEN_TRACE_CPU("Builds::GetUnusedReferences"); ZEN_MEMSCOPE(GetBuildstoreTag()); auto Log = [&Ctx]() { return Ctx.Logger; }; size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: buildstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", "buildstore", UsedCount, InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::span UnusedReferences = KeepUnusedReferences(m_References, IoCids); UsedCount = IoCids.size() - UnusedReferences.size(); return UnusedReferences; } private: BuildStore& m_Store; std::vector m_References; }; std::string BuildStore::GetGcName(GcCtx& Ctx) { ZEN_UNUSED(Ctx); ZEN_MEMSCOPE(GetBuildstoreTag()); return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string()); } class BuildStoreGcCompator : public GcStoreCompactor { using BlobEntry = BuildStore::BlobEntry; using PayloadEntry = BuildStore::PayloadEntry; using MetadataEntry = BuildStore::MetadataEntry; using MetadataDiskEntry = BuildStore::MetadataDiskEntry; using BlobIndex = BuildStore::BlobIndex; using PayloadIndex = BuildStore::PayloadIndex; using MetadataIndex = BuildStore::MetadataIndex; public: BuildStoreGcCompator(BuildStore& Store, std::vector&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {} virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function& ClaimDiskReserveCallback) override { ZEN_UNUSED(ClaimDiskReserveCallback); ZEN_TRACE_CPU("Builds::CompactStore"); ZEN_MEMSCOPE(GetBuildstoreTag()); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}", m_Store.m_Config.RootDirectory, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); const auto __ = MakeGuard([&] { m_Store.Flush(); }); if (!m_RemovedBlobs.empty()) { if (Ctx.Settings.CollectSmallObjects) { m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique(); }); auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); }); BlockStore::BlockUsageMap BlockUsage; { RwLock::SharedLockScope __(m_Store.m_Lock); for (auto LookupIt : m_Store.m_BlobLookup) { const BlobIndex ReadBlobIndex = LookupIt.second; const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex]; if (ReadBlobEntry.Metadata) { const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata]; uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex; uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement); if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end()) { BlockStore::BlockUsageInfo& Info = BlockUsageIt.value(); Info.EntryCount++; Info.DiskUsage += ChunkSize; } else { BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); } } } } BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90); BlockStoreCompactState BlockCompactState; std::vector BlockCompactStateKeys; BlockCompactState.IncludeBlocks(BlocksToCompact); if (BlocksToCompact.size() > 0) { { RwLock::SharedLockScope ___(m_Store.m_Lock); for (const auto& Entry : m_Store.m_BlobLookup) { BlobIndex Index = Entry.second; if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) { if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location)) { BlockCompactStateKeys.push_back(Entry.first); } } } } if (Ctx.Settings.IsDeleteMode) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks", m_Store.m_Config.RootDirectory, BlocksToCompact.size()); } m_Store.m_MetadataBlockStore.CompactBlocks( BlockCompactState, m_Store.m_Config.MetadataBlockStoreAlignement, [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { std::vector MovedEntries; MovedEntries.reserve(MovedArray.size()); RwLock::ExclusiveLockScope _(m_Store.m_Lock); for (const std::pair& Moved : MovedArray) { size_t ChunkIndex = Moved.first; const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; ZEN_ASSERT(m_Store.m_TrackedCacheKeys); if (m_Store.m_TrackedCacheKeys->contains(Key)) { continue; } if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end()) { const BlobIndex Index = It->second; if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) { m_Store.m_MetadataEntries[Meta].Location = Moved.second; MovedEntries.push_back( MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key}); } } } m_Store.m_MetadatalogFile.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { return false; } return true; }, ClaimDiskReserveCallback, fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory)); } else { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks", m_Store.m_Config.RootDirectory, BlocksToCompact.size()); } } } } } } virtual std::string GetGcName(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); ZEN_MEMSCOPE(GetBuildstoreTag()); return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); } private: BuildStore& m_Store; const std::vector m_RemovedBlobs; }; GcStoreCompactor* BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { ZEN_TRACE_CPU("Builds::RemoveExpiredData"); ZEN_MEMSCOPE(GetBuildstoreTag()); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: buildstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", m_Config.RootDirectory, Stats.CheckedCount, Stats.FoundCount, Stats.DeletedCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } }); const GcClock::Tick ExpireTicks = Ctx.Settings.BuildStoreExpireTime.time_since_epoch().count(); std::vector ExpiredBlobs; tsl::robin_set SizeDroppedBlobs; { struct SizeInfo { const IoHash Key; uint32_t SecondsSinceEpoch = 0; uint64_t BlobSize = 0; }; bool DiskSizeExceeded = false; const uint64_t CurrentDiskSize = m_LargeBlobStore.StorageSize().DiskSize + m_SmallBlobStore.StorageSize().DiskSize + m_MetadataBlockStore.TotalSize(); if (CurrentDiskSize > m_Config.MaxDiskSpaceLimit) { DiskSizeExceeded = true; } uint64_t ExpiredDataSize = 0; std::vector NonExpiredBlobSizeInfos; { RwLock::SharedLockScope __(m_Lock); if (DiskSizeExceeded) { NonExpiredBlobSizeInfos.reserve(m_BlobLookup.size()); } for (const auto& It : m_BlobLookup) { const BlobIndex ReadBlobIndex = It.second; const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; uint64_t Size = 0; if (ReadBlobEntry.Payload) { const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload]; Size += Payload.GetSize(); } if (ReadBlobEntry.Metadata) { const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata]; Size += Metadata.Location.Size; } const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; if (AccessTick < ExpireTicks) { ExpiredBlobs.push_back(It.first); ExpiredDataSize += ExpiredDataSize; } else if (DiskSizeExceeded) { NonExpiredBlobSizeInfos.emplace_back(SizeInfo{.Key = It.first, .SecondsSinceEpoch = ReadBlobEntry.LastAccessTime.GetSecondsSinceEpoch(), .BlobSize = Size}); } } Stats.CheckedCount += m_BlobLookup.size(); Stats.FoundCount += ExpiredBlobs.size(); } if (DiskSizeExceeded) { const uint64_t NewSizeLimit = m_Config.MaxDiskSpaceLimit - (m_Config.MaxDiskSpaceLimit >> 4); // Remove a bit more than just below the limit so we have some space to grow if ((CurrentDiskSize - ExpiredDataSize) > NewSizeLimit) { std::vector NonExpiredOrder; NonExpiredOrder.resize(NonExpiredBlobSizeInfos.size()); for (size_t Index = 0; Index < NonExpiredOrder.size(); Index++) { NonExpiredOrder[Index] = Index; } std::sort(NonExpiredOrder.begin(), NonExpiredOrder.end(), [&NonExpiredBlobSizeInfos](const size_t Lhs, const size_t Rhs) { const SizeInfo& LhsInfo = NonExpiredBlobSizeInfos[Lhs]; const SizeInfo& RhsInfo = NonExpiredBlobSizeInfos[Rhs]; return LhsInfo.SecondsSinceEpoch < RhsInfo.SecondsSinceEpoch; }); auto It = NonExpiredOrder.begin(); while (It != NonExpiredOrder.end()) { const SizeInfo& Info = NonExpiredBlobSizeInfos[*It]; if ((CurrentDiskSize - ExpiredDataSize) < NewSizeLimit) { break; } ExpiredDataSize += Info.BlobSize; ExpiredBlobs.push_back(Info.Key); SizeDroppedBlobs.insert(Info.Key); It++; } } } } std::vector RemovedBlobs; if (!ExpiredBlobs.empty()) { if (Ctx.Settings.IsDeleteMode) { RemovedBlobs.reserve(ExpiredBlobs.size()); std::vector RemovedPayloads; std::vector RemoveMetadatas; RwLock::ExclusiveLockScope __(m_Lock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } for (const IoHash& ExpiredBlob : ExpiredBlobs) { if (auto It = m_BlobLookup.find(ExpiredBlob); It != m_BlobLookup.end()) { const BlobIndex ReadBlobIndex = It->second; const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; if (SizeDroppedBlobs.contains(ExpiredBlob) || (AccessTick < ExpireTicks)) { if (ReadBlobEntry.Payload) { RemovedPayloads.push_back( PayloadDiskEntry{.Entry = m_PayloadEntries[ReadBlobEntry.Payload], .BlobHash = ExpiredBlob}); RemovedPayloads.back().Entry.AddFlag(PayloadEntry::kTombStone); m_PayloadEntries[ReadBlobEntry.Payload] = {}; m_BlobEntries[ReadBlobIndex].Payload = {}; } if (ReadBlobEntry.Metadata) { RemoveMetadatas.push_back( MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob}); RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; m_MetadataEntries[ReadBlobEntry.Metadata] = {}; m_BlobEntries[ReadBlobIndex].Metadata = {}; } m_BlobLookup.erase(It); m_LastAccessTimeUpdateCount++; RemovedBlobs.push_back(ExpiredBlob); Stats.DeletedCount++; } } } if (!RemovedPayloads.empty()) { m_PayloadlogFile.Append(RemovedPayloads); } if (!RemoveMetadatas.empty()) { m_MetadatalogFile.Append(RemoveMetadatas); } } } if (!RemovedBlobs.empty()) { CompactState(); } return new BuildStoreGcCompator(*this, std::move(RemovedBlobs)); } std::vector BuildStore::CreateReferenceCheckers(GcCtx& Ctx) { ZEN_UNUSED(Ctx); ZEN_MEMSCOPE(GetBuildstoreTag()); return {new BuildStoreGcReferenceChecker(*this)}; } std::vector BuildStore::CreateReferenceValidators(GcCtx& Ctx) { ZEN_UNUSED(Ctx); return {}; } std::vector BuildStore::LockState(GcCtx& Ctx) { ZEN_UNUSED(Ctx); std::vector Locks; Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); return Locks; } void BuildStore::ScrubStorage(ScrubContext& ScrubCtx) { ZEN_UNUSED(ScrubCtx); // TODO } GcStorageSize BuildStore::StorageSize() const { GcStorageSize Result; Result.DiskSize = m_MetadataBlockStore.TotalSize(); return Result; } /* ___________ __ \__ ___/___ _______/ |_ ______ | |_/ __ \ / ___/\ __\/ ___/ | |\ ___/ \___ \ | | \___ \ |____| \___ >____ > |__| /____ > \/ \/ \/ */ #if ZEN_WITH_TESTS TEST_CASE("BuildStore.Blobs") { ScopedTemporaryDirectory _; BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; std::vector CompressedBlobsHashes; { GcManager Gc; BuildStore Store(Config, Gc); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); CHECK(Payload); CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); IoHash VerifyRawHash; uint64_t VerifyRawSize; CompressedBuffer CompressedBlob = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); CHECK(CompressedBlob); CHECK(VerifyRawHash == RawHash); IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } } { GcManager Gc; BuildStore Store(Config, Gc); for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); CHECK(Payload); CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); IoHash VerifyRawHash; uint64_t VerifyRawSize; CompressedBuffer CompressedBlob = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); CHECK(CompressedBlob); CHECK(VerifyRawHash == RawHash); IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } } { GcManager Gc; BuildStore Store(Config, Gc); for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); CHECK(Payload); CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); IoHash VerifyRawHash; uint64_t VerifyRawSize; CompressedBuffer CompressedBlob = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); CHECK(CompressedBlob); CHECK(VerifyRawHash == RawHash); IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } } } namespace blockstore::testing { IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector>& KeyValues) { CbObjectWriter Writer; Writer.AddHash("rawHash"sv, BlobHash); Writer.BeginObject("values"); { for (const auto& V : KeyValues) { Writer.AddString(V.first, V.second); } } Writer.EndObject(); // values return Writer.Save().GetBuffer().AsIoBuffer(); }; } // namespace blockstore::testing TEST_CASE("BuildStore.Metadata") { using namespace blockstore::testing; ScopedTemporaryDirectory _; BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; std::vector BlobHashes; std::vector MetaPayloads; { GcManager Gc; BuildStore Store(Config, Gc); for (size_t I = 0; I < 5; I++) { BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); MetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(BlobHashes, MetaPayloads); std::vector ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); } } { GcManager Gc; BuildStore Store(Config, Gc); std::vector ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); } for (const IoHash& BlobHash : BlobHashes) { CHECK(!Store.GetBlob(BlobHash)); } } std::vector CompressedBlobsHashes; { GcManager Gc; BuildStore Store(Config, Gc); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); for (const auto& MetadataIt : MetadataPayloads) { CHECK(!MetadataIt); } for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(Blob); IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); } } std::vector BlobMetaPayloads; { GcManager Gc; BuildStore Store(Config, Gc); for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK_EQ(IoHash::HashBuffer(MetadataPayload), IoHash::HashBuffer(BlobMetaPayloads[I])); } } { GcManager Gc; BuildStore Store(Config, Gc); std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); } for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(Blob); IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); } BlobMetaPayloads.clear(); for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back( MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); } { GcManager Gc; BuildStore Store(Config, Gc); std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); } for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(Blob); IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); } } } TEST_CASE("BuildStore.GC") { using namespace blockstore::testing; ScopedTemporaryDirectory _; BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; std::vector CompressedBlobsHashes; std::vector BlobMetaPayloads; { GcManager Gc; BuildStore Store(Config, Gc); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); } { GcManager Gc; BuildStore Store(Config, Gc); { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), .CollectSmallObjects = false, .IsDeleteMode = false, .Verbose = true}); CHECK(!Result.WasCancelled); for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(Blob); IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); } std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); } } { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() + std::chrono::hours(1), .CollectSmallObjects = true, .IsDeleteMode = true, .Verbose = true}); CHECK(!Result.WasCancelled); for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); CHECK(!Blob); } std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; CHECK(!MetadataPayload); } } } } TEST_CASE("BuildStore.SizeLimit") { using namespace blockstore::testing; ScopedTemporaryDirectory _; BuildStoreConfig Config = {.MaxDiskSpaceLimit = 1024u * 1024u}; Config.RootDirectory = _.Path() / "build_store"; std::vector CompressedBlobsHashes; std::vector BlobMetaPayloads; { GcManager Gc; BuildStore Store(Config, Gc); for (size_t I = 0; I < 64; I++) { IoBuffer Blob = CreateSemiRandomBlob(65537 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)), OodleCompressor::Mermaid, OodleCompressionLevel::None); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); Store.PutBlob(CompressedBlobsHashes.back(), Payload); } for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); { for (size_t I = 0; I < 64; I++) { const IoHash& Key = CompressedBlobsHashes[I]; GcClock::Tick AccessTick = (GcClock::Now() + std::chrono::minutes(I)).time_since_epoch().count(); Store.SetLastAccessTime(Key, AccessTime(AccessTick)); } } } { GcManager Gc; BuildStore Store(Config, Gc); { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), .CollectSmallObjects = true, .IsDeleteMode = true, .Verbose = true}); uint32_t DeletedBlobs = 0; CHECK(!Result.WasCancelled); for (const IoHash& BlobHash : CompressedBlobsHashes) { IoBuffer Blob = Store.GetBlob(BlobHash); if (!Blob) { DeletedBlobs++; } else { IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); CHECK(DecompressedBlob); CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); } } CHECK(DeletedBlobs == 50); std::vector MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { const IoBuffer& MetadataPayload = MetadataPayloads[I]; if (I < DeletedBlobs) { CHECK(!MetadataPayload); } else { CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); } } } } } void buildstore_forcelink() { } #endif } // namespace zen