diff options
| author | Zousar Shaker <[email protected]> | 2025-03-26 17:01:30 -0600 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-26 17:01:30 -0600 |
| commit | 56af48235a5394b2906e9b347d43404394a4e756 (patch) | |
| tree | 05530b3da98773794320e5a2ab507ec879bf6e9d /src/zenstore | |
| parent | Descriptive type conversion messages (diff) | |
| parent | zen build cache service (#318) (diff) | |
| download | zen-56af48235a5394b2906e9b347d43404394a4e756.tar.xz zen-56af48235a5394b2906e9b347d43404394a4e756.zip | |
Merge branch 'main' into zs/ui-show-cook-artifacts
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 1475 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 4 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 21 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/accesstime.h | 47 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/buildstore/buildstore.h | 186 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 1 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacheshared.h | 38 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 4 |
11 files changed, 1738 insertions, 46 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index e976c061d..63c0388fa 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -578,7 +578,7 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons } void -BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) +BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::WriteChunks"); diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp new file mode 100644 index 000000000..8674aab75 --- /dev/null +++ b/src/zenstore/buildstore/buildstore.cpp @@ -0,0 +1,1475 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/buildstore/buildstore.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory/llm.h> +#include <zencore/scopeguard.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> + +#include <zencore/uid.h> +#include <zencore/xxhash.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/compress.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + +namespace zen { +const FLLMTag& +GetBuildstoreTag() +{ + static FLLMTag _("store", FLLMTag("builds")); + + return _; +} + +using namespace std::literals; + +namespace blobstore::impl { + + const std::string BaseName = "builds"; + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + + std::filesystem::path GetBlobIndexPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + IndexExtension); + } + + std::filesystem::path GetBlobLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + LogExtension); } + + std::filesystem::path GetMetaIndexPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + "_meta" + IndexExtension); + } + + std::filesystem::path GetMetaLogPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + "_meta" + LogExtension); + } +} // namespace blobstore::impl + +BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) +: m_Config(Config) +, m_Gc(Gc) +, m_LargeBlobStore(m_Gc) +, m_SmallBlobStore(Gc) +, m_MetadataBlockStore() +{ + ZEN_TRACE_CPU("BuildStore::BuildStore"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + try + { + std::filesystem::path BlobLogPath = blobstore::impl::GetBlobLogPath(Config.RootDirectory); + std::filesystem::path MetaLogPath = blobstore::impl::GetMetaLogPath(Config.RootDirectory); + bool IsNew = !(std::filesystem::exists(BlobLogPath) && std::filesystem::exists(MetaLogPath)); + + if (!IsNew) + { + m_BlobLogFlushPosition = ReadPayloadLog(RwLock::ExclusiveLockScope(m_Lock), BlobLogPath, 0); + m_MetaLogFlushPosition = ReadMetadataLog(RwLock::ExclusiveLockScope(m_Lock), MetaLogPath, 0); + } + m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew); + m_SmallBlobStore.Initialize(Config.RootDirectory, + "blob_cas", + m_Config.SmallBlobBlockStoreMaxBlockSize, + m_Config.SmallBlobBlockStoreAlignement, + IsNew); + m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20); + { + BlockStore::BlockIndexSet KnownBlocks; + for (const BlobEntry& Blob : m_BlobEntries) + { + if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) + { + const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; + KnownBlocks.insert(Metadata.Location.BlockIndex); + } + } + m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + } + + m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); + m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); + + m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what()); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); + } +} + +BuildStore::~BuildStore() +{ + try + { + ZEN_TRACE_CPU("BuildStore::~BuildStore"); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); + Flush(); + m_MetadatalogFile.Close(); + m_PayloadlogFile.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BuildStore() threw exception: {}", Ex.what()); + } +} + +void +BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) +{ + ZEN_TRACE_CPU("BuildStore::PutBlob"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCompressedBinary); + { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex BlobIndex = It->second; + if (m_BlobEntries[BlobIndex].Payload) + { + return; + } + } + } + + PayloadEntry Entry; + if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize) + { + CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash); + ZEN_UNUSED(Result); + Entry = {.Flags = PayloadEntry::kStandalone}; + } + else + { + CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash); + ZEN_UNUSED(Result); + Entry = {.Flags = 0}; + } + m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); + + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + if (Blob.Payload) + { + m_PayloadEntries[Blob.Payload] = Entry; + } + else + { + Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Entry); + } + Blob.LastAccessTime = GcClock::TickCount(); + } + else + { + PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + // we only remove during GC and compact this then... + m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert({BlobHash, NewBlobIndex}); + } +} + +IoBuffer +BuildStore::GetBlob(const IoHash& BlobHash) +{ + ZEN_TRACE_CPU("BuildStore::GetBlob"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + RwLock::SharedLockScope Lock(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + Blob.LastAccessTime = GcClock::TickCount(); + if (Blob.Payload) + { + const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload]; + const bool IsStandalone = (Entry.Flags & PayloadEntry::kStandalone) != 0; + Lock.ReleaseNow(); + + IoBuffer Chunk; + if (IsStandalone) + { + ZEN_TRACE_CPU("GetLarge"); + Chunk = m_LargeBlobStore.FindChunk(BlobHash); + } + else + { + ZEN_TRACE_CPU("GetSmall"); + Chunk = m_SmallBlobStore.FindChunk(BlobHash); + } + if (Chunk) + { + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return Chunk; + } + else + { + ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block"); + } + } + } + return {}; +} + +std::vector<BuildStore::BlobExistsResult> +BuildStore::BlobsExists(std::span<const IoHash> BlobHashes) +{ + ZEN_TRACE_CPU("BuildStore::BlobsExists"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + std::vector<BuildStore::BlobExistsResult> Result; + Result.reserve(BlobHashes.size()); + RwLock::SharedLockScope _(m_Lock); + for (const IoHash& BlobHash : BlobHashes) + { + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + bool HasPayload = !!Blob.Payload; + bool HasMetadata = !!Blob.Metadata; + Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata}); + } + else + { + Result.push_back({}); + } + } + return Result; +} + +void +BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas) +{ + ZEN_TRACE_CPU("BuildStore::PutMetadatas"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + size_t WriteBlobIndex = 0; + m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) { + RwLock::ExclusiveLockScope _(m_Lock); + for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++) + { + const IoBuffer& Data = MetaDatas[WriteBlobIndex]; + const IoHash& BlobHash = BlobHashes[WriteBlobIndex]; + const BlockStoreLocation& Location = Locations[LocationIndex]; + + MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0}; + m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); + + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + if (Blob.Metadata) + { + m_MetadataEntries[Blob.Metadata] = Entry; + } + else + { + Blob.Metadata = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Entry); + } + Blob.LastAccessTime = GcClock::TickCount(); + } + else + { + MetadataIndex NewMetadataIndex = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert({BlobHash, NewBlobIndex}); + } + WriteBlobIndex++; + if (m_TrackedCacheKeys) + { + m_TrackedCacheKeys->insert(BlobHash); + } + } + }); +} + +std::vector<IoBuffer> +BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool) +{ + ZEN_TRACE_CPU("BuildStore::GetMetadatas"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + std::vector<BlockStoreLocation> MetaLocations; + std::vector<size_t> MetaLocationResultIndexes; + MetaLocations.reserve(BlobHashes.size()); + MetaLocationResultIndexes.reserve(BlobHashes.size()); + tsl::robin_set<uint32_t> ReferencedBlocks; + + std::vector<IoBuffer> Result; + std::vector<ZenContentType> ResultContentTypes; + Result.resize(BlobHashes.size()); + ResultContentTypes.resize(BlobHashes.size(), ZenContentType::kUnknownContentType); + { + RwLock::SharedLockScope _(m_Lock); + for (size_t Index = 0; Index < BlobHashes.size(); Index++) + { + const IoHash& BlobHash = BlobHashes[Index]; + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlobEntry = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlobEntry.Metadata) + { + const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata]; + MetaLocations.push_back(ExistingMetadataEntry.Location); + MetaLocationResultIndexes.push_back(Index); + ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex); + ResultContentTypes[Index] = ExistingMetadataEntry.ContentType; + } + ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::Tick()); + } + } + } + + auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + if (ChunkIndexes.size() < 4) + { + for (size_t ChunkIndex : ChunkIndexes) + { + IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]); + if (Chunk) + { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = std::move(Chunk); + } + } + return true; + } + return m_MetadataBlockStore.IterateBlock( + MetaLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + if (Data != nullptr) + { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size); + } + return true; + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = File.GetChunk(Offset, Size); + return true; + }, + 8u * 1024u); + }; + + if (!MetaLocations.empty()) + { + Latch WorkLatch(1); + + m_MetadataBlockStore.IterateChunks(MetaLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) -> bool { + ZEN_UNUSED(BlockIndex); + if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1) + { + return DoOneBlock(ChunkIndexes); + } + else + { + ZEN_ASSERT(OptionalWorkerPool != nullptr); + WorkLatch.AddCount(1); + try + { + OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + try + { + DoOneBlock(ChunkIndexes); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); + } + }); + } + catch (const std::exception& Ex) + { + WorkLatch.CountDown(); + ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); + } + return true; + } + }); + + WorkLatch.CountDown(); + WorkLatch.Wait(); + } + for (size_t Index = 0; Index < Result.size(); Index++) + { + if (Result[Index]) + { + Result[Index].SetContentType(ResultContentTypes[Index]); + } + } + return Result; +} + +void +BuildStore::Flush() +{ + ZEN_TRACE_CPU("BuildStore::Flush"); + try + { + m_LargeBlobStore.Flush(); + m_SmallBlobStore.Flush(); + m_MetadataBlockStore.Flush(false); + + m_PayloadlogFile.Flush(); + m_MetadatalogFile.Flush(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("BuildStore::Flush failed. Reason: {}", Ex.what()); + } +} + +void +BuildStore::CompactState() +{ + ZEN_TRACE_CPU("BuildStore::CompactState"); + + std::vector<BlobEntry> BlobEntries; + std::vector<PayloadEntry> PayloadEntries; + std::vector<MetadataEntry> MetadataEntries; + + tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> BlobLookup; + + RwLock::ExclusiveLockScope _(m_Lock); + const size_t EntryCount = m_BlobLookup.size(); + BlobLookup.reserve(EntryCount); + const size_t PayloadCount = m_PayloadEntries.size(); + PayloadEntries.reserve(PayloadCount); + const size_t MetadataCount = m_MetadataEntries.size(); + MetadataEntries.reserve(MetadataCount); + + for (auto LookupIt : m_BlobLookup) + { + const IoHash& BlobHash = LookupIt.first; + const BlobIndex ReadBlobIndex = LookupIt.second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const BlobIndex WriteBlobIndex(gsl::narrow<uint32_t>(BlobEntries.size())); + BlobEntries.push_back(ReadBlobEntry); + BlobEntry& WriteBlobEntry = BlobEntries.back(); + + if (WriteBlobEntry.Payload) + { + const PayloadEntry& ReadPayloadEntry = m_PayloadEntries[ReadBlobEntry.Payload]; + WriteBlobEntry.Payload = PayloadIndex(gsl::narrow<uint32_t>(PayloadEntries.size())); + PayloadEntries.push_back(ReadPayloadEntry); + } + if (ReadBlobEntry.Metadata) + { + const MetadataEntry& ReadMetadataEntry = m_MetadataEntries[ReadBlobEntry.Metadata]; + WriteBlobEntry.Metadata = MetadataIndex(gsl::narrow<uint32_t>(MetadataEntries.size())); + MetadataEntries.push_back(ReadMetadataEntry); + } + + BlobLookup.insert({BlobHash, WriteBlobIndex}); + } + m_BlobEntries.swap(BlobEntries); + m_PayloadEntries.swap(PayloadEntries); + m_MetadataEntries.swap(MetadataEntries); + m_BlobLookup.swap(BlobLookup); +} + +uint64_t +BuildStore::ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + ZEN_TRACE_CPU("BuildStore::ReadPayloadLog"); + if (!std::filesystem::is_regular_file(LogPath)) + { + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read build store '{}' payload log containing {} entries in {}", + LogPath, + LogEntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<PayloadDiskEntry> CasLog; + if (!CasLog.IsValid(LogPath)) + { + std::filesystem::remove(LogPath); + return 0; + } + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full payload log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const PayloadDiskEntry& Record) { + std::string InvalidEntryReason; + if (Record.Entry.Flags & PayloadEntry::kTombStone) + { + // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation + m_BlobLookup.erase(Record.BlobHash); + return; + } + + if (!ValidatePayloadDiskEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid payload entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlob.Payload) + { + const PayloadIndex ExistingPayloadIndex = ExistingBlob.Payload; + m_PayloadEntries[ExistingPayloadIndex] = Record.Entry; + } + else + { + const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Record.Entry); + ExistingBlob.Payload = NewPayloadIndex; + } + } + else + { + const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Record.Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::Tick())}); + m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); + } + }, + SkipEntryCount); + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid payload entries in '{}'", InvalidEntryCount, LogPath); + } + + return LogEntryCount; +} + +uint64_t +BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + ZEN_TRACE_CPU("BuildStore::ReadMetadataLog"); + if (!std::filesystem::is_regular_file(LogPath)) + { + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read build store '{}' metadata log containing {} entries in {}", + LogPath, + LogEntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<MetadataDiskEntry> CasLog; + if (!CasLog.IsValid(LogPath)) + { + std::filesystem::remove(LogPath); + return 0; + } + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full metadata log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const MetadataDiskEntry& Record) { + std::string InvalidEntryReason; + if (Record.Entry.Flags & MetadataEntry::kTombStone) + { + // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation + m_BlobLookup.erase(Record.BlobHash); + return; + } + + if (!ValidateMetadataDiskEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid metadata entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlob.Metadata) + { + const MetadataIndex ExistingMetadataIndex = ExistingBlob.Metadata; + m_MetadataEntries[ExistingMetadataIndex] = Record.Entry; + } + else + { + const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Record.Entry); + ExistingBlob.Metadata = NewMetadataIndex; + } + } + else + { + const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Record.Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::Tick())}); + m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); + } + }, + SkipEntryCount); + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid metadata entries in '{}'", InvalidEntryCount, LogPath); + } + + return LogEntryCount; +} + +bool +BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason) +{ + if (Entry.BlobHash == IoHash::Zero) + { + OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.Flags, Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & PayloadEntry::kTombStone) + { + return true; + } + if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0 || Entry.Entry.Reserved3 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + return true; +} + +bool +BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason) +{ + if (Entry.BlobHash == IoHash::Zero) + { + OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Location.Size == 0) + { + OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size); + return false; + } + if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & MetadataEntry::kTombStone) + { + return true; + } + if (Entry.Entry.ContentType == ZenContentType::kCOUNT) + { + OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + return true; +} + +class BuildStoreGcReferenceChecker : public GcReferenceChecker +{ +public: + BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {} + virtual std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); + } + + virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } + + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Builds::UpdateLockedState"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + m_References.reserve(m_Store.m_BlobLookup.size()); + for (const auto& It : m_Store.m_BlobLookup) + { + const BuildStore::BlobIndex ExistingBlobIndex = It.second; + if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload) + { + m_References.push_back(It.first); + } + } + FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References); + } + + virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_UNUSED(Ctx); + ZEN_TRACE_CPU("Builds::GetUnusedReferences"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + size_t InitialCount = IoCids.size(); + size_t UsedCount = InitialCount; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: buildstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "buildstore", + UsedCount, + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); + UsedCount = IoCids.size() - UnusedReferences.size(); + return UnusedReferences; + } + +private: + BuildStore& m_Store; + std::vector<IoHash> m_References; +}; + +std::string +BuildStore::GetGcName(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string()); +} + +class BuildStoreGcCompator : public GcStoreCompactor +{ + using BlobEntry = BuildStore::BlobEntry; + using PayloadEntry = BuildStore::PayloadEntry; + using MetadataEntry = BuildStore::MetadataEntry; + using MetadataDiskEntry = BuildStore::MetadataDiskEntry; + using BlobIndex = BuildStore::BlobIndex; + using PayloadIndex = BuildStore::PayloadIndex; + using MetadataIndex = BuildStore::MetadataIndex; + +public: + BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {} + + virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override + { + ZEN_UNUSED(ClaimDiskReserveCallback); + ZEN_TRACE_CPU("Builds::CompactStore"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}", + m_Store.m_Config.RootDirectory, + NiceBytes(Stats.RemovedDisk), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + if (!m_RemovedBlobs.empty()) + { + if (Ctx.Settings.CollectSmallObjects) + { + m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); }); + auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); }); + + BlockStore::BlockUsageMap BlockUsage; + { + RwLock::SharedLockScope __(m_Store.m_Lock); + + for (auto LookupIt : m_Store.m_BlobLookup) + { + const BlobIndex ReadBlobIndex = LookupIt.second; + const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex]; + + if (ReadBlobEntry.Metadata) + { + const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata]; + + uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex; + uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement); + + if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end()) + { + BlockStore::BlockUsageInfo& Info = BlockUsageIt.value(); + Info.EntryCount++; + Info.DiskUsage += ChunkSize; + } + else + { + BlockUsage.insert_or_assign(BlockIndex, + BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); + } + } + } + } + + BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90); + BlockStoreCompactState BlockCompactState; + std::vector<IoHash> BlockCompactStateKeys; + BlockCompactState.IncludeBlocks(BlocksToCompact); + + if (BlocksToCompact.size() > 0) + { + { + RwLock::SharedLockScope ___(m_Store.m_Lock); + for (const auto& Entry : m_Store.m_BlobLookup) + { + BlobIndex Index = Entry.second; + + if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) + { + if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location)) + { + BlockCompactStateKeys.push_back(Entry.first); + } + } + } + } + + if (Ctx.Settings.IsDeleteMode) + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks", + m_Store.m_Config.RootDirectory, + BlocksToCompact.size()); + } + + m_Store.m_MetadataBlockStore.CompactBlocks( + BlockCompactState, + m_Store.m_Config.MetadataBlockStoreAlignement, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector<MetadataDiskEntry> MovedEntries; + MovedEntries.reserve(MovedArray.size()); + RwLock::ExclusiveLockScope _(m_Store.m_Lock); + for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + ZEN_ASSERT(m_Store.m_TrackedCacheKeys); + if (m_Store.m_TrackedCacheKeys->contains(Key)) + { + continue; + } + + if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end()) + { + const BlobIndex Index = It->second; + + if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) + { + m_Store.m_MetadataEntries[Meta].Location = Moved.second; + MovedEntries.push_back( + MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key}); + } + } + } + m_Store.m_MetadatalogFile.Append(MovedEntries); + Stats.RemovedDisk += FreedDiskSpace; + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + return true; + }, + ClaimDiskReserveCallback, + fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory)); + } + else + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks", + m_Store.m_Config.RootDirectory, + BlocksToCompact.size()); + } + } + } + } + } + } + + virtual std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); + } + +private: + BuildStore& m_Store; + const std::vector<IoHash> m_RemovedBlobs; +}; + +GcStoreCompactor* +BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) +{ + ZEN_TRACE_CPU("Builds::RemoveExpiredData"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", + m_Config.RootDirectory, + Stats.CheckedCount, + Stats.FoundCount, + Stats.DeletedCount, + NiceBytes(Stats.FreedMemory), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + }); + + const GcClock::Tick ExpireTicks = Ctx.Settings.BuildStoreExpireTime.time_since_epoch().count(); + + std::vector<IoHash> ExpiredBlobs; + { + RwLock::SharedLockScope __(m_Lock); + for (const auto& It : m_BlobLookup) + { + const BlobIndex ReadBlobIndex = It.second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; + if (AccessTick < ExpireTicks) + { + ExpiredBlobs.push_back(It.first); + } + } + Stats.CheckedCount += m_BlobLookup.size(); + Stats.FoundCount += ExpiredBlobs.size(); + } + + std::vector<IoHash> RemovedBlobs; + if (!ExpiredBlobs.empty()) + { + if (Ctx.Settings.IsDeleteMode) + { + RemovedBlobs.reserve(ExpiredBlobs.size()); + + std::vector<PayloadDiskEntry> RemovedPayloads; + std::vector<MetadataDiskEntry> RemoveMetadatas; + + RwLock::ExclusiveLockScope __(m_Lock); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + + for (const IoHash& ExpiredBlob : ExpiredBlobs) + { + if (auto It = m_BlobLookup.find(ExpiredBlob); It != m_BlobLookup.end()) + { + const BlobIndex ReadBlobIndex = It->second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; + + if (AccessTick < ExpireTicks) + { + if (ReadBlobEntry.Payload) + { + RemovedPayloads.push_back( + PayloadDiskEntry{.Entry = m_PayloadEntries[ReadBlobEntry.Payload], .BlobHash = ExpiredBlob}); + RemovedPayloads.back().Entry.Flags |= PayloadEntry::kTombStone; + m_PayloadEntries[ReadBlobEntry.Payload] = {}; + m_BlobEntries[ReadBlobIndex].Payload = {}; + } + if (ReadBlobEntry.Metadata) + { + RemoveMetadatas.push_back( + MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob}); + RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; + m_MetadataEntries[ReadBlobEntry.Metadata] = {}; + m_BlobEntries[ReadBlobIndex].Metadata = {}; + } + + m_BlobLookup.erase(It); + + RemovedBlobs.push_back(ExpiredBlob); + Stats.DeletedCount++; + } + } + } + if (!RemovedPayloads.empty()) + { + m_PayloadlogFile.Append(RemovedPayloads); + } + if (!RemoveMetadatas.empty()) + { + m_MetadatalogFile.Append(RemoveMetadatas); + } + } + } + + if (!RemovedBlobs.empty()) + { + CompactState(); + } + + return new BuildStoreGcCompator(*this, std::move(RemovedBlobs)); +} + +std::vector<GcReferenceChecker*> +BuildStore::CreateReferenceCheckers(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + return {new BuildStoreGcReferenceChecker(*this)}; +} + +std::vector<GcReferenceValidator*> +BuildStore::CreateReferenceValidators(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + return {}; +} + +std::vector<RwLock::SharedLockScope> +BuildStore::LockState(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); + return Locks; +} + +/* + ___________ __ + \__ ___/___ _______/ |_ ______ + | |_/ __ \ / ___/\ __\/ ___/ + | |\ ___/ \___ \ | | \___ \ + |____| \___ >____ > |__| /____ > + \/ \/ \/ +*/ + +#if ZEN_WITH_TESTS + +TEST_CASE("BuildStore.Blobs") +{ + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> CompressedBlobsHashes; + { + GcManager Gc; + BuildStore Store(Config, Gc); + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } +} + +namespace blockstore::testing { + IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) + { + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, BlobHash); + Writer.BeginObject("values"); + { + for (const auto& V : KeyValues) + { + Writer.AddString(V.first, V.second); + } + } + Writer.EndObject(); // values + return Writer.Save().GetBuffer().AsIoBuffer(); + }; + +} // namespace blockstore::testing + +TEST_CASE("BuildStore.Metadata") +{ + using namespace blockstore::testing; + + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> BlobHashes; + std::vector<IoBuffer> MetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + + for (size_t I = 0; I < 5; I++) + { + BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); + MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); + MetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(BlobHashes, MetaPayloads); + + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); + for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) + { + const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); + CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); + for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) + { + const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); + CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); + } + for (const IoHash& BlobHash : BlobHashes) + { + CHECK(!Store.GetBlob(BlobHash)); + } + } + std::vector<IoHash> CompressedBlobsHashes; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + for (const auto& MetadataIt : MetadataPayloads) + { + CHECK(!MetadataIt); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + } + + std::vector<IoBuffer> BlobMetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK_EQ(IoHash::HashBuffer(MetadataPayload), IoHash::HashBuffer(BlobMetaPayloads[I])); + } + } + + { + GcManager Gc; + BuildStore Store(Config, Gc); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + + BlobMetaPayloads.clear(); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back( + MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + } +} + +TEST_CASE("BuildStore.GC") +{ + using namespace blockstore::testing; + + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> CompressedBlobsHashes; + std::vector<IoBuffer> BlobMetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + + { + GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), + .CollectSmallObjects = false, + .IsDeleteMode = false, + .Verbose = true}); + CHECK(!Result.WasCancelled); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); + } + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + } + { + GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() + std::chrono::hours(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .Verbose = true}); + CHECK(!Result.WasCancelled); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(!Blob); + } + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(!MetadataPayload); + } + } + } +} + +void +buildstore_forcelink() +{ +} + +#endif + +} // namespace zen diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 2be0542db..b64bc26dd 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -226,7 +226,7 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) } std::vector<CasStore::InsertResult> -CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes) +CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes) { ZEN_MEMSCOPE(GetCasContainerTag()); @@ -323,7 +323,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) } bool -CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, +CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 07e620086..2eb4c233a 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -52,11 +52,11 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore ~CasContainerStrategy(); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); - std::vector<CasStore::InsertResult> InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes); + std::vector<CasStore::InsertResult> InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(HashKeySet& InOutChunks); - bool IterateChunks(std::span<IoHash> ChunkHashes, + bool IterateChunks(std::span<const IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 7ac10d613..fe5ae284b 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -1081,7 +1081,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); { ZEN_TRACE_CPU("GcV2::LockReferencers"); - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore/BuildStore) until // we delete the ReferenceLockers Latch WorkLeft(1); { @@ -1108,7 +1108,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_TRACE_CPU("GcV2::UpdateLockedState"); // Locking all references checkers so we have a steady state of which references are used - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore/BuildStore) until // we delete the ReferenceCheckers Latch WorkLeft(1); @@ -1739,6 +1739,7 @@ GcScheduler::AppendGCLog(std::string_view Id, GcClock::TimePoint StartTime, cons { Writer << "CacheExpireTime"sv << ToDateTime(Settings.CacheExpireTime); Writer << "ProjectStoreExpireTime"sv << ToDateTime(Settings.ProjectStoreExpireTime); + Writer << "BuildStoreExpireTime"sv << ToDateTime(Settings.BuildStoreExpireTime); Writer << "CollectSmallObjects"sv << Settings.CollectSmallObjects; Writer << "IsDeleteMode"sv << Settings.IsDeleteMode; Writer << "SkipCidDelete"sv << Settings.SkipCidDelete; @@ -1940,6 +1941,7 @@ GcScheduler::SchedulerThread() std::chrono::seconds LightweightGcInterval = m_Config.LightweightInterval; std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration; std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration; + std::chrono::seconds MaxBuildStoreDuration = m_Config.MaxBuildStoreDuration; uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; bool SkipCid = false; GcVersion UseGCVersion = m_Config.UseGCVersion; @@ -1975,6 +1977,10 @@ GcScheduler::SchedulerThread() { MaxProjectStoreDuration = TriggerParams.MaxProjectStoreDuration; } + if (TriggerParams.MaxBuildStoreDuration != std::chrono::seconds::max()) + { + MaxBuildStoreDuration = TriggerParams.MaxBuildStoreDuration; + } if (TriggerParams.DiskSizeSoftLimit != 0) { DiskSizeSoftLimit = TriggerParams.DiskSizeSoftLimit; @@ -2046,6 +2052,8 @@ GcScheduler::SchedulerThread() MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; GcClock::TimePoint ProjectStoreExpireTime = MaxProjectStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxProjectStoreDuration; + GcClock::TimePoint BuildStoreExpireTime = + MaxBuildStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxBuildStoreDuration; const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); @@ -2102,6 +2110,10 @@ GcScheduler::SchedulerThread() { ProjectStoreExpireTime = SizeBasedExpireTime; } + if (SizeBasedExpireTime > BuildStoreExpireTime) + { + BuildStoreExpireTime = SizeBasedExpireTime; + } } std::chrono::seconds RemainingTimeUntilGc = @@ -2227,6 +2239,7 @@ GcScheduler::SchedulerThread() bool GcSuccess = CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, + BuildStoreExpireTime, DoDelete, CollectSmallObjects, SkipCid, @@ -2333,6 +2346,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time bool GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, + const GcClock::TimePoint& BuildStoreExpireTime, bool Delete, bool CollectSmallObjects, bool SkipCid, @@ -2416,6 +2430,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcSettings Settings = {.CacheExpireTime = CacheExpireTime, .ProjectStoreExpireTime = ProjectStoreExpireTime, + .BuildStoreExpireTime = BuildStoreExpireTime, .CollectSmallObjects = CollectSmallObjects, .IsDeleteMode = Delete, .SkipCidDelete = SkipCid, @@ -2447,6 +2462,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, } SB.Append(fmt::format(" Cache cutoff time: {}\n", Settings.CacheExpireTime)); SB.Append(fmt::format(" Project store cutoff time: {}\n", Settings.ProjectStoreExpireTime)); + SB.Append(fmt::format(" Build store cutoff time: {}\n", Settings.BuildStoreExpireTime)); }; { @@ -2552,6 +2568,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, if (Delete) { GcClock::TimePoint KeepRangeStart = Min(CacheExpireTime, ProjectStoreExpireTime); + KeepRangeStart = Min(KeepRangeStart, BuildStoreExpireTime); m_LastGcExpireTime = KeepRangeStart; std::unique_lock Lock(m_GcMutex); m_DiskUsageWindow.KeepRange(KeepRangeStart.time_since_epoch().count(), GcClock::Duration::max().count()); diff --git a/src/zenstore/include/zenstore/accesstime.h b/src/zenstore/include/zenstore/accesstime.h new file mode 100644 index 000000000..a28dc908b --- /dev/null +++ b/src/zenstore/include/zenstore/accesstime.h @@ -0,0 +1,47 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenstore/gc.h> + +#include <gsl/gsl-lite.hpp> + +namespace zen { + +// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch +struct AccessTime +{ + explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} + AccessTime& operator=(GcClock::Tick Tick) noexcept + { + SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); + return *this; + } + operator GcClock::Tick() const noexcept + { + return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) + .count(); + } + + AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime& operator=(AccessTime&& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + AccessTime& operator=(const AccessTime& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + +private: + static uint32_t ToSeconds(GcClock::Tick Tick) + { + return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); + } + std::atomic_uint32_t SecondsSinceEpoch; +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 97357e5cb..0c72a13aa 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -156,7 +156,7 @@ public: void WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback); typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback; - void WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback); + void WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback); IoBuffer TryGetChunk(const BlockStoreLocation& Location) const; void Flush(bool ForceNewBlock); diff --git a/src/zenstore/include/zenstore/buildstore/buildstore.h b/src/zenstore/include/zenstore/buildstore/buildstore.h new file mode 100644 index 000000000..302af5f9c --- /dev/null +++ b/src/zenstore/include/zenstore/buildstore/buildstore.h @@ -0,0 +1,186 @@ + +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/blockstore.h> + +#include <zencore/iohash.h> +#include <zenstore/accesstime.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> +#include "../compactcas.h" +#include "../filecas.h" + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +struct BuildStoreConfig +{ + std::filesystem::path RootDirectory; + uint32_t SmallBlobBlockStoreMaxBlockSize = 256 * 1024 * 1024; + uint64_t SmallBlobBlockStoreMaxBlockEmbedSize = 1 * 1024 * 1024; + uint32_t SmallBlobBlockStoreAlignement = 16; + uint32_t MetadataBlockStoreMaxBlockSize = 64 * 1024 * 1024; + uint32_t MetadataBlockStoreAlignement = 8; +}; + +class BuildStore : public GcReferencer, public GcReferenceLocker //, public GcStorage +{ +public: + explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc); + virtual ~BuildStore(); + + void PutBlob(const IoHash& BlobHashes, const IoBuffer& Payload); + IoBuffer GetBlob(const IoHash& BlobHashes); + + struct BlobExistsResult + { + bool HasBody = 0; + bool HasMetadata = 0; + }; + + std::vector<BlobExistsResult> BlobsExists(std::span<const IoHash> BlobHashes); + + void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas); + std::vector<IoBuffer> GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool); + + void Flush(); + +private: + void CompactState(); + + uint64_t ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount); + uint64_t ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount); + + //////// GcReferencer + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; + + //////// GcReferenceLocker + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; + +#pragma pack(push) +#pragma pack(1) + struct PayloadEntry + { + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value + static const uint8_t kStandalone = 0x20u; // This payload is stored as a standalone value + + uint8_t Flags = 0; + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + uint8_t Reserved3 = 0; + }; + static_assert(sizeof(PayloadEntry) == 4); + + struct PayloadDiskEntry + { + PayloadEntry Entry; // 4 bytes + IoHash BlobHash; // 20 bytes + }; + static_assert(sizeof(PayloadDiskEntry) == 24); + + struct MetadataEntry + { + BlockStoreLocation Location; // 12 bytes + + ZenContentType ContentType = ZenContentType::kCOUNT; // 1 byte + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value + uint8_t Flags = 0; // 1 byte + + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + }; + static_assert(sizeof(MetadataEntry) == 16); + + struct MetadataDiskEntry + { + MetadataEntry Entry; // 16 bytes + IoHash BlobHash; // 20 bytes + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + uint8_t Reserved3 = 0; + uint8_t Reserved4 = 0; + }; + static_assert(sizeof(MetadataDiskEntry) == 40); + +#pragma pack(pop) + + static bool ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason); + static bool ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason); + + struct PayloadIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + PayloadIndex() = default; + explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const PayloadIndex& Other) const = default; + }; + + struct MetadataIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + MetadataIndex() = default; + explicit MetadataIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const MetadataIndex& Other) const = default; + }; + + struct BlobIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + BlobIndex() = default; + explicit BlobIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const BlobIndex& Other) const = default; + }; + + struct BlobEntry + { + PayloadIndex Payload; + MetadataIndex Metadata; + AccessTime LastAccessTime; + }; + static_assert(sizeof(BlobEntry) == 12); + + const BuildStoreConfig m_Config; + GcManager& m_Gc; + + RwLock m_Lock; + + std::vector<PayloadEntry> m_PayloadEntries; + std::vector<MetadataEntry> m_MetadataEntries; + + std::vector<BlobEntry> m_BlobEntries; + tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> m_BlobLookup; + + FileCasStrategy m_LargeBlobStore; + CasContainerStrategy m_SmallBlobStore; + BlockStore m_MetadataBlockStore; + + TCasLogFile<PayloadDiskEntry> m_PayloadlogFile; + TCasLogFile<MetadataDiskEntry> m_MetadatalogFile; + uint64_t m_BlobLogFlushPosition = 0; + uint64_t m_MetaLogFlushPosition = 0; + + std::unique_ptr<HashSet> m_TrackedCacheKeys; + + friend class BuildStoreGcReferenceChecker; + friend class BuildStoreGcReferencePruner; + friend class BuildStoreGcCompator; +}; + +void buildstore_forcelink(); + +} // namespace zen diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 05400c784..5a51718d3 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -5,6 +5,7 @@ #include "cacheshared.h" #include <zencore/stats.h> +#include <zenstore/accesstime.h> #include <zenstore/blockstore.h> #include <zenstore/caslog.h> diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index 521c78bb1..ef1b803de 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -72,42 +72,4 @@ struct CacheContentStats bool IsKnownBadBucketName(std::string_view BucketName); bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer); -////////////////////////////////////////////////////////////////////////// - -// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch -struct AccessTime -{ - explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} - AccessTime& operator=(GcClock::Tick Tick) noexcept - { - SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); - return *this; - } - operator GcClock::Tick() const noexcept - { - return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) - .count(); - } - - AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} - AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} - AccessTime& operator=(AccessTime&& Rhs) noexcept - { - SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - AccessTime& operator=(const AccessTime& Rhs) noexcept - { - SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - -private: - static uint32_t ToSeconds(GcClock::Tick Tick) - { - return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); - } - std::atomic_uint32_t SecondsSinceEpoch; -}; - } // namespace zen diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 3daae0a93..67aadef71 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -55,6 +55,7 @@ struct GcSettings { GcClock::TimePoint CacheExpireTime = GcClock::Now(); GcClock::TimePoint ProjectStoreExpireTime = GcClock::Now(); + GcClock::TimePoint BuildStoreExpireTime = GcClock::Now(); bool CollectSmallObjects = false; bool IsDeleteMode = false; bool SkipCidDelete = false; @@ -412,6 +413,7 @@ struct GcSchedulerConfig std::chrono::seconds Interval{}; std::chrono::seconds MaxCacheDuration{86400}; std::chrono::seconds MaxProjectStoreDuration{604800}; + std::chrono::seconds MaxBuildStoreDuration{604800}; bool CollectSmallObjects = true; bool Enabled = true; uint64_t DiskReserveSize = 1ul << 28; @@ -496,6 +498,7 @@ public: bool CollectSmallObjects = false; std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); std::chrono::seconds MaxProjectStoreDuration = std::chrono::seconds::max(); + std::chrono::seconds MaxBuildStoreDuration = std::chrono::seconds::max(); uint64_t DiskSizeSoftLimit = 0; bool SkipCid = false; bool SkipDelete = false; @@ -528,6 +531,7 @@ private: void SchedulerThread(); bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, + const GcClock::TimePoint& BuildStoreExpireTime, bool Delete, bool CollectSmallObjects, bool SkipCid, |