diff options
| author | Florent Devillechabrol <[email protected]> | 2025-04-02 10:38:02 -0700 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-04-02 10:38:02 -0700 |
| commit | 486a22ad2c61bc1616d8745e0077eb320089bfec (patch) | |
| tree | 665d5c9002cd97c04ddffeaf211fcf8e55d01dce /src/zenstore | |
| parent | Fixed missing trailing quote when converting binary data from compact binary ... (diff) | |
| parent | added --find-max-block-count option to builds upload (#337) (diff) | |
| download | zen-486a22ad2c61bc1616d8745e0077eb320089bfec.tar.xz zen-486a22ad2c61bc1616d8745e0077eb320089bfec.zip | |
Merge branch 'main' into fd-fix-binary-json
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 20 | ||||
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 1496 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 76 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/caslog.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 34 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 4 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 78 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 41 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/accesstime.h | 47 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/buildstore/buildstore.h | 186 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 1 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacheshared.h | 38 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 4 | ||||
| -rw-r--r-- | src/zenstore/workspaces.cpp | 26 |
16 files changed, 1879 insertions, 178 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index e976c061d..7cc09be15 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -85,7 +85,7 @@ BlockStoreFile::Create(uint64_t InitialSize) ZEN_TRACE_CPU("BlockStoreFile::Create"); auto ParentPath = m_Path.parent_path(); - if (!std::filesystem::is_directory(ParentPath)) + if (!IsDir(ParentPath)) { CreateDirectories(ParentPath); } @@ -215,7 +215,7 @@ IsMetaDataValid(const std::filesystem::path& BlockPath, const std::filesystem::p } if (MetaWriteTime < BlockWriteTime) { - std::filesystem::remove(MetaPath, Ec); + RemoveFile(MetaPath, Ec); return false; } return true; @@ -239,7 +239,7 @@ BlockStoreFile::MetaSize() const if (IsMetaDataValid(m_Path, MetaPath)) { std::error_code DummyEc; - if (uint64_t Size = std::filesystem::file_size(MetaPath, DummyEc); !DummyEc) + if (uint64_t Size = FileSizeFromPath(MetaPath, DummyEc); !DummyEc) { return Size; } @@ -252,7 +252,7 @@ BlockStoreFile::RemoveMeta() { std::filesystem::path MetaPath = GetMetaPath(); std::error_code DummyEc; - std::filesystem::remove(MetaPath, DummyEc); + RemoveFile(MetaPath, DummyEc); } std::filesystem::path @@ -291,7 +291,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max m_MaxBlockSize = MaxBlockSize; m_MaxBlockCount = MaxBlockCount; - if (std::filesystem::is_directory(m_BlocksBasePath)) + if (IsDir(m_BlocksBasePath)) { uint32_t NextBlockIndex = 0; std::vector<std::filesystem::path> FoldersToScan; @@ -500,7 +500,7 @@ BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&, { OutBlockPath = GetBlockPath(m_BlocksBasePath, ProbeIndex); std::error_code Ec; - bool Exists = std::filesystem::exists(OutBlockPath, Ec); + bool Exists = IsFile(OutBlockPath, Ec); if (Ec) { ZEN_WARN("Failed to probe existence of file '{}' when trying to allocate a new block. Reason: '{}'", @@ -578,7 +578,7 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons } void -BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) +BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback) { ZEN_MEMSCOPE(GetBlocksTag()); ZEN_TRACE_CPU("BlockStore::WriteChunks"); @@ -1375,14 +1375,14 @@ TEST_CASE("blockstore.blockfile") BoopChunk = File1.GetChunk(5, 5); } - CHECK(std::filesystem::exists(RootDirectory / "1")); + CHECK(IsFile(RootDirectory / "1")); const char* Data = static_cast<const char*>(DataChunk.GetData()); CHECK(std::string(Data) == "data"); const char* Boop = static_cast<const char*>(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } - CHECK(std::filesystem::exists(RootDirectory / "1")); + CHECK(IsFile(RootDirectory / "1")); { IoBuffer DataChunk; @@ -1401,7 +1401,7 @@ TEST_CASE("blockstore.blockfile") const char* Boop = static_cast<const char*>(BoopChunk.GetData()); CHECK(std::string(Boop) == "boop"); } - CHECK(!std::filesystem::exists(RootDirectory / "1")); + CHECK(!IsFile(RootDirectory / "1")); } namespace blockstore::impl { diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp new file mode 100644 index 000000000..f26901458 --- /dev/null +++ b/src/zenstore/buildstore/buildstore.cpp @@ -0,0 +1,1496 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/buildstore/buildstore.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory/llm.h> +#include <zencore/scopeguard.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> + +#include <zencore/uid.h> +#include <zencore/xxhash.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/compress.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + +namespace zen { +const FLLMTag& +GetBuildstoreTag() +{ + static FLLMTag _("store", FLLMTag("builds")); + + return _; +} + +using namespace std::literals; + +namespace blobstore::impl { + + const std::string BaseName = "builds"; + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + + std::filesystem::path GetBlobIndexPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + IndexExtension); + } + + std::filesystem::path GetBlobLogPath(const std::filesystem::path& RootDirectory) { return RootDirectory / (BaseName + LogExtension); } + + std::filesystem::path GetMetaIndexPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + "_meta" + IndexExtension); + } + + std::filesystem::path GetMetaLogPath(const std::filesystem::path& RootDirectory) + { + return RootDirectory / (BaseName + "_meta" + LogExtension); + } +} // namespace blobstore::impl + +BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) +: m_Config(Config) +, m_Gc(Gc) +, m_LargeBlobStore(m_Gc) +, m_SmallBlobStore(Gc) +, m_MetadataBlockStore() +{ + ZEN_TRACE_CPU("BuildStore::BuildStore"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + try + { + std::filesystem::path BlobLogPath = blobstore::impl::GetBlobLogPath(Config.RootDirectory); + std::filesystem::path MetaLogPath = blobstore::impl::GetMetaLogPath(Config.RootDirectory); + bool IsNew = !(IsFile(BlobLogPath) && IsFile(MetaLogPath)); + + if (!IsNew) + { + m_BlobLogFlushPosition = ReadPayloadLog(RwLock::ExclusiveLockScope(m_Lock), BlobLogPath, 0); + m_MetaLogFlushPosition = ReadMetadataLog(RwLock::ExclusiveLockScope(m_Lock), MetaLogPath, 0); + } + m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew); + m_SmallBlobStore.Initialize(Config.RootDirectory, + "blob_cas", + m_Config.SmallBlobBlockStoreMaxBlockSize, + m_Config.SmallBlobBlockStoreAlignement, + IsNew); + m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20); + { + BlockStore::BlockIndexSet KnownBlocks; + for (const BlobEntry& Blob : m_BlobEntries) + { + if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) + { + const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; + KnownBlocks.insert(Metadata.Location.BlockIndex); + } + } + m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + } + + m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); + m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); + + m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what()); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); + } +} + +BuildStore::~BuildStore() +{ + try + { + ZEN_TRACE_CPU("BuildStore::~BuildStore"); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); + Flush(); + m_MetadatalogFile.Close(); + m_PayloadlogFile.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BuildStore() threw exception: {}", Ex.what()); + } +} + +void +BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) +{ + ZEN_TRACE_CPU("BuildStore::PutBlob"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCompressedBinary); + { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex BlobIndex = It->second; + if (m_BlobEntries[BlobIndex].Payload) + { + return; + } + } + } + + PayloadEntry Entry; + if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize) + { + CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash); + ZEN_UNUSED(Result); + Entry = {.Flags = PayloadEntry::kStandalone}; + } + else + { + CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash); + ZEN_UNUSED(Result); + Entry = {.Flags = 0}; + } + m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); + + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + if (Blob.Payload) + { + m_PayloadEntries[Blob.Payload] = Entry; + } + else + { + Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Entry); + } + Blob.LastAccessTime = GcClock::TickCount(); + } + else + { + PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + // we only remove during GC and compact this then... + m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert({BlobHash, NewBlobIndex}); + } +} + +IoBuffer +BuildStore::GetBlob(const IoHash& BlobHash) +{ + ZEN_TRACE_CPU("BuildStore::GetBlob"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + RwLock::SharedLockScope Lock(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + Blob.LastAccessTime = GcClock::TickCount(); + if (Blob.Payload) + { + const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload]; + const bool IsStandalone = (Entry.Flags & PayloadEntry::kStandalone) != 0; + Lock.ReleaseNow(); + + IoBuffer Chunk; + if (IsStandalone) + { + ZEN_TRACE_CPU("GetLarge"); + Chunk = m_LargeBlobStore.FindChunk(BlobHash); + } + else + { + ZEN_TRACE_CPU("GetSmall"); + Chunk = m_SmallBlobStore.FindChunk(BlobHash); + } + if (Chunk) + { + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return Chunk; + } + else + { + ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block"); + } + } + } + return {}; +} + +std::vector<BuildStore::BlobExistsResult> +BuildStore::BlobsExists(std::span<const IoHash> BlobHashes) +{ + ZEN_TRACE_CPU("BuildStore::BlobsExists"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + std::vector<BuildStore::BlobExistsResult> Result; + Result.reserve(BlobHashes.size()); + RwLock::SharedLockScope _(m_Lock); + for (const IoHash& BlobHash : BlobHashes) + { + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + bool HasPayload = !!Blob.Payload; + bool HasMetadata = !!Blob.Metadata; + Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata}); + } + else + { + Result.push_back({}); + } + } + return Result; +} + +void +BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas) +{ + ZEN_TRACE_CPU("BuildStore::PutMetadatas"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + size_t WriteBlobIndex = 0; + m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) { + RwLock::ExclusiveLockScope _(m_Lock); + for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++) + { + const IoBuffer& Data = MetaDatas[WriteBlobIndex]; + const IoHash& BlobHash = BlobHashes[WriteBlobIndex]; + const BlockStoreLocation& Location = Locations[LocationIndex]; + + MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0}; + m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); + + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + if (Blob.Metadata) + { + m_MetadataEntries[Blob.Metadata] = Entry; + } + else + { + Blob.Metadata = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Entry); + } + Blob.LastAccessTime = GcClock::TickCount(); + } + else + { + MetadataIndex NewMetadataIndex = MetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert({BlobHash, NewBlobIndex}); + } + WriteBlobIndex++; + if (m_TrackedCacheKeys) + { + m_TrackedCacheKeys->insert(BlobHash); + } + } + }); +} + +std::vector<IoBuffer> +BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool) +{ + ZEN_TRACE_CPU("BuildStore::GetMetadatas"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + std::vector<BlockStoreLocation> MetaLocations; + std::vector<size_t> MetaLocationResultIndexes; + MetaLocations.reserve(BlobHashes.size()); + MetaLocationResultIndexes.reserve(BlobHashes.size()); + tsl::robin_set<uint32_t> ReferencedBlocks; + + std::vector<IoBuffer> Result; + std::vector<ZenContentType> ResultContentTypes; + Result.resize(BlobHashes.size()); + ResultContentTypes.resize(BlobHashes.size(), ZenContentType::kUnknownContentType); + { + RwLock::SharedLockScope _(m_Lock); + for (size_t Index = 0; Index < BlobHashes.size(); Index++) + { + const IoHash& BlobHash = BlobHashes[Index]; + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlobEntry = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlobEntry.Metadata) + { + const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata]; + MetaLocations.push_back(ExistingMetadataEntry.Location); + MetaLocationResultIndexes.push_back(Index); + ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex); + ResultContentTypes[Index] = ExistingMetadataEntry.ContentType; + } + ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::TickCount()); + } + } + } + + auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + if (ChunkIndexes.size() < 4) + { + for (size_t ChunkIndex : ChunkIndexes) + { + IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]); + if (Chunk) + { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = std::move(Chunk); + } + } + return true; + } + return m_MetadataBlockStore.IterateBlock( + MetaLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + if (Data != nullptr) + { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size); + } + return true; + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; + Result[ResultIndex] = File.GetChunk(Offset, Size); + return true; + }, + 8u * 1024u); + }; + + if (!MetaLocations.empty()) + { + Latch WorkLatch(1); + + m_MetadataBlockStore.IterateChunks(MetaLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) -> bool { + ZEN_UNUSED(BlockIndex); + if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1) + { + return DoOneBlock(ChunkIndexes); + } + else + { + ZEN_ASSERT(OptionalWorkerPool != nullptr); + WorkLatch.AddCount(1); + try + { + OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + try + { + DoOneBlock(ChunkIndexes); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); + } + }); + } + catch (const std::exception& Ex) + { + WorkLatch.CountDown(); + ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); + } + return true; + } + }); + + WorkLatch.CountDown(); + WorkLatch.Wait(); + } + for (size_t Index = 0; Index < Result.size(); Index++) + { + if (Result[Index]) + { + Result[Index].SetContentType(ResultContentTypes[Index]); + } + } + return Result; +} + +void +BuildStore::Flush() +{ + ZEN_TRACE_CPU("BuildStore::Flush"); + try + { + m_LargeBlobStore.Flush(); + m_SmallBlobStore.Flush(); + m_MetadataBlockStore.Flush(false); + + m_PayloadlogFile.Flush(); + m_MetadatalogFile.Flush(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("BuildStore::Flush failed. Reason: {}", Ex.what()); + } +} + +void +BuildStore::CompactState() +{ + ZEN_TRACE_CPU("BuildStore::CompactState"); + + std::vector<BlobEntry> BlobEntries; + std::vector<PayloadEntry> PayloadEntries; + std::vector<MetadataEntry> MetadataEntries; + + tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> BlobLookup; + + RwLock::ExclusiveLockScope _(m_Lock); + const size_t EntryCount = m_BlobLookup.size(); + BlobLookup.reserve(EntryCount); + const size_t PayloadCount = m_PayloadEntries.size(); + PayloadEntries.reserve(PayloadCount); + const size_t MetadataCount = m_MetadataEntries.size(); + MetadataEntries.reserve(MetadataCount); + + for (auto LookupIt : m_BlobLookup) + { + const IoHash& BlobHash = LookupIt.first; + const BlobIndex ReadBlobIndex = LookupIt.second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const BlobIndex WriteBlobIndex(gsl::narrow<uint32_t>(BlobEntries.size())); + BlobEntries.push_back(ReadBlobEntry); + BlobEntry& WriteBlobEntry = BlobEntries.back(); + + if (WriteBlobEntry.Payload) + { + const PayloadEntry& ReadPayloadEntry = m_PayloadEntries[ReadBlobEntry.Payload]; + WriteBlobEntry.Payload = PayloadIndex(gsl::narrow<uint32_t>(PayloadEntries.size())); + PayloadEntries.push_back(ReadPayloadEntry); + } + if (ReadBlobEntry.Metadata) + { + const MetadataEntry& ReadMetadataEntry = m_MetadataEntries[ReadBlobEntry.Metadata]; + WriteBlobEntry.Metadata = MetadataIndex(gsl::narrow<uint32_t>(MetadataEntries.size())); + MetadataEntries.push_back(ReadMetadataEntry); + } + + BlobLookup.insert({BlobHash, WriteBlobIndex}); + } + m_BlobEntries.swap(BlobEntries); + m_PayloadEntries.swap(PayloadEntries); + m_MetadataEntries.swap(MetadataEntries); + m_BlobLookup.swap(BlobLookup); +} + +uint64_t +BuildStore::ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + ZEN_TRACE_CPU("BuildStore::ReadPayloadLog"); + if (!IsFile(LogPath)) + { + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read build store '{}' payload log containing {} entries in {}", + LogPath, + LogEntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<PayloadDiskEntry> CasLog; + if (!CasLog.IsValid(LogPath)) + { + RemoveFile(LogPath); + return 0; + } + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full payload log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const PayloadDiskEntry& Record) { + std::string InvalidEntryReason; + if (Record.Entry.Flags & PayloadEntry::kTombStone) + { + // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation + if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end()) + { + if (!m_BlobEntries[ExistingIt->second].Metadata) + { + m_BlobLookup.erase(ExistingIt); + } + else + { + m_BlobEntries[ExistingIt->second].Payload = {}; + } + } + return; + } + + if (!ValidatePayloadDiskEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid payload entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlob.Payload) + { + const PayloadIndex ExistingPayloadIndex = ExistingBlob.Payload; + m_PayloadEntries[ExistingPayloadIndex] = Record.Entry; + } + else + { + const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Record.Entry); + ExistingBlob.Payload = NewPayloadIndex; + } + } + else + { + const PayloadIndex NewPayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Record.Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); + } + }, + SkipEntryCount); + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid payload entries in '{}'", InvalidEntryCount, LogPath); + } + + return LogEntryCount; +} + +uint64_t +BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + ZEN_TRACE_CPU("BuildStore::ReadMetadataLog"); + if (!IsFile(LogPath)) + { + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read build store '{}' metadata log containing {} entries in {}", + LogPath, + LogEntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<MetadataDiskEntry> CasLog; + if (!CasLog.IsValid(LogPath)) + { + RemoveFile(LogPath); + return 0; + } + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full metadata log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const MetadataDiskEntry& Record) { + std::string InvalidEntryReason; + if (Record.Entry.Flags & MetadataEntry::kTombStone) + { + // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation + // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation + if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end()) + { + if (!m_BlobEntries[ExistingIt->second].Payload) + { + m_BlobLookup.erase(ExistingIt); + } + else + { + m_BlobEntries[ExistingIt->second].Metadata = {}; + } + } + return; + } + + if (!ValidateMetadataDiskEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid metadata entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (auto It = m_BlobLookup.find(Record.BlobHash); It != m_BlobLookup.end()) + { + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& ExistingBlob = m_BlobEntries[ExistingBlobIndex]; + if (ExistingBlob.Metadata) + { + const MetadataIndex ExistingMetadataIndex = ExistingBlob.Metadata; + m_MetadataEntries[ExistingMetadataIndex] = Record.Entry; + } + else + { + const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Record.Entry); + ExistingBlob.Metadata = NewMetadataIndex; + } + } + else + { + const MetadataIndex NewMetadataIndex(gsl::narrow<uint32_t>(m_MetadataEntries.size())); + m_MetadataEntries.push_back(Record.Entry); + + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert_or_assign(Record.BlobHash, NewBlobIndex); + } + }, + SkipEntryCount); + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid metadata entries in '{}'", InvalidEntryCount, LogPath); + } + + return LogEntryCount; +} + +bool +BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason) +{ + if (Entry.BlobHash == IoHash::Zero) + { + OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.Flags, Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & PayloadEntry::kTombStone) + { + return true; + } + if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0 || Entry.Entry.Reserved3 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + return true; +} + +bool +BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason) +{ + if (Entry.BlobHash == IoHash::Zero) + { + OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Location.Size == 0) + { + OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size); + return false; + } + if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Entry.Flags & MetadataEntry::kTombStone) + { + return true; + } + if (Entry.Entry.ContentType == ZenContentType::kCOUNT) + { + OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0) + { + OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); + return false; + } + return true; +} + +class BuildStoreGcReferenceChecker : public GcReferenceChecker +{ +public: + BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {} + virtual std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); + } + + virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } + + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Builds::UpdateLockedState"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + m_References.reserve(m_Store.m_BlobLookup.size()); + for (const auto& It : m_Store.m_BlobLookup) + { + const BuildStore::BlobIndex ExistingBlobIndex = It.second; + if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload) + { + m_References.push_back(It.first); + } + } + FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References); + } + + virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_UNUSED(Ctx); + ZEN_TRACE_CPU("Builds::GetUnusedReferences"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + size_t InitialCount = IoCids.size(); + size_t UsedCount = InitialCount; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: buildstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "buildstore", + UsedCount, + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); + UsedCount = IoCids.size() - UnusedReferences.size(); + return UnusedReferences; + } + +private: + BuildStore& m_Store; + std::vector<IoHash> m_References; +}; + +std::string +BuildStore::GetGcName(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string()); +} + +class BuildStoreGcCompator : public GcStoreCompactor +{ + using BlobEntry = BuildStore::BlobEntry; + using PayloadEntry = BuildStore::PayloadEntry; + using MetadataEntry = BuildStore::MetadataEntry; + using MetadataDiskEntry = BuildStore::MetadataDiskEntry; + using BlobIndex = BuildStore::BlobIndex; + using PayloadIndex = BuildStore::PayloadIndex; + using MetadataIndex = BuildStore::MetadataIndex; + +public: + BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {} + + virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override + { + ZEN_UNUSED(ClaimDiskReserveCallback); + ZEN_TRACE_CPU("Builds::CompactStore"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}", + m_Store.m_Config.RootDirectory, + NiceBytes(Stats.RemovedDisk), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + if (!m_RemovedBlobs.empty()) + { + if (Ctx.Settings.CollectSmallObjects) + { + m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); }); + auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); }); + + BlockStore::BlockUsageMap BlockUsage; + { + RwLock::SharedLockScope __(m_Store.m_Lock); + + for (auto LookupIt : m_Store.m_BlobLookup) + { + const BlobIndex ReadBlobIndex = LookupIt.second; + const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex]; + + if (ReadBlobEntry.Metadata) + { + const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata]; + + uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex; + uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement); + + if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end()) + { + BlockStore::BlockUsageInfo& Info = BlockUsageIt.value(); + Info.EntryCount++; + Info.DiskUsage += ChunkSize; + } + else + { + BlockUsage.insert_or_assign(BlockIndex, + BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); + } + } + } + } + + BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90); + BlockStoreCompactState BlockCompactState; + std::vector<IoHash> BlockCompactStateKeys; + BlockCompactState.IncludeBlocks(BlocksToCompact); + + if (BlocksToCompact.size() > 0) + { + { + RwLock::SharedLockScope ___(m_Store.m_Lock); + for (const auto& Entry : m_Store.m_BlobLookup) + { + BlobIndex Index = Entry.second; + + if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) + { + if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location)) + { + BlockCompactStateKeys.push_back(Entry.first); + } + } + } + } + + if (Ctx.Settings.IsDeleteMode) + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks", + m_Store.m_Config.RootDirectory, + BlocksToCompact.size()); + } + + m_Store.m_MetadataBlockStore.CompactBlocks( + BlockCompactState, + m_Store.m_Config.MetadataBlockStoreAlignement, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector<MetadataDiskEntry> MovedEntries; + MovedEntries.reserve(MovedArray.size()); + RwLock::ExclusiveLockScope _(m_Store.m_Lock); + for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + ZEN_ASSERT(m_Store.m_TrackedCacheKeys); + if (m_Store.m_TrackedCacheKeys->contains(Key)) + { + continue; + } + + if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end()) + { + const BlobIndex Index = It->second; + + if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) + { + m_Store.m_MetadataEntries[Meta].Location = Moved.second; + MovedEntries.push_back( + MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key}); + } + } + } + m_Store.m_MetadatalogFile.Append(MovedEntries); + Stats.RemovedDisk += FreedDiskSpace; + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + return true; + }, + ClaimDiskReserveCallback, + fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory)); + } + else + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks", + m_Store.m_Config.RootDirectory, + BlocksToCompact.size()); + } + } + } + } + } + } + + virtual std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); + } + +private: + BuildStore& m_Store; + const std::vector<IoHash> m_RemovedBlobs; +}; + +GcStoreCompactor* +BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) +{ + ZEN_TRACE_CPU("Builds::RemoveExpiredData"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: buildstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", + m_Config.RootDirectory, + Stats.CheckedCount, + Stats.FoundCount, + Stats.DeletedCount, + NiceBytes(Stats.FreedMemory), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + }); + + const GcClock::Tick ExpireTicks = Ctx.Settings.BuildStoreExpireTime.time_since_epoch().count(); + + std::vector<IoHash> ExpiredBlobs; + { + RwLock::SharedLockScope __(m_Lock); + for (const auto& It : m_BlobLookup) + { + const BlobIndex ReadBlobIndex = It.second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; + if (AccessTick < ExpireTicks) + { + ExpiredBlobs.push_back(It.first); + } + } + Stats.CheckedCount += m_BlobLookup.size(); + Stats.FoundCount += ExpiredBlobs.size(); + } + + std::vector<IoHash> RemovedBlobs; + if (!ExpiredBlobs.empty()) + { + if (Ctx.Settings.IsDeleteMode) + { + RemovedBlobs.reserve(ExpiredBlobs.size()); + + std::vector<PayloadDiskEntry> RemovedPayloads; + std::vector<MetadataDiskEntry> RemoveMetadatas; + + RwLock::ExclusiveLockScope __(m_Lock); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + + for (const IoHash& ExpiredBlob : ExpiredBlobs) + { + if (auto It = m_BlobLookup.find(ExpiredBlob); It != m_BlobLookup.end()) + { + const BlobIndex ReadBlobIndex = It->second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + + const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; + + if (AccessTick < ExpireTicks) + { + if (ReadBlobEntry.Payload) + { + RemovedPayloads.push_back( + PayloadDiskEntry{.Entry = m_PayloadEntries[ReadBlobEntry.Payload], .BlobHash = ExpiredBlob}); + RemovedPayloads.back().Entry.Flags |= PayloadEntry::kTombStone; + m_PayloadEntries[ReadBlobEntry.Payload] = {}; + m_BlobEntries[ReadBlobIndex].Payload = {}; + } + if (ReadBlobEntry.Metadata) + { + RemoveMetadatas.push_back( + MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob}); + RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; + m_MetadataEntries[ReadBlobEntry.Metadata] = {}; + m_BlobEntries[ReadBlobIndex].Metadata = {}; + } + + m_BlobLookup.erase(It); + + RemovedBlobs.push_back(ExpiredBlob); + Stats.DeletedCount++; + } + } + } + if (!RemovedPayloads.empty()) + { + m_PayloadlogFile.Append(RemovedPayloads); + } + if (!RemoveMetadatas.empty()) + { + m_MetadatalogFile.Append(RemoveMetadatas); + } + } + } + + if (!RemovedBlobs.empty()) + { + CompactState(); + } + + return new BuildStoreGcCompator(*this, std::move(RemovedBlobs)); +} + +std::vector<GcReferenceChecker*> +BuildStore::CreateReferenceCheckers(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + ZEN_MEMSCOPE(GetBuildstoreTag()); + return {new BuildStoreGcReferenceChecker(*this)}; +} + +std::vector<GcReferenceValidator*> +BuildStore::CreateReferenceValidators(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + return {}; +} + +std::vector<RwLock::SharedLockScope> +BuildStore::LockState(GcCtx& Ctx) +{ + ZEN_UNUSED(Ctx); + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); + return Locks; +} + +/* + ___________ __ + \__ ___/___ _______/ |_ ______ + | |_/ __ \ / ___/\ __\/ ___/ + | |\ ___/ \___ \ | | \___ \ + |____| \___ >____ > |__| /____ > + \/ \/ \/ +*/ + +#if ZEN_WITH_TESTS + +TEST_CASE("BuildStore.Blobs") +{ + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> CompressedBlobsHashes; + { + GcManager Gc; + BuildStore Store(Config, Gc); + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& RawHash : CompressedBlobsHashes) + { + IoBuffer Payload = Store.GetBlob(RawHash); + CHECK(Payload); + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } +} + +namespace blockstore::testing { + IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) + { + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, BlobHash); + Writer.BeginObject("values"); + { + for (const auto& V : KeyValues) + { + Writer.AddString(V.first, V.second); + } + } + Writer.EndObject(); // values + return Writer.Save().GetBuffer().AsIoBuffer(); + }; + +} // namespace blockstore::testing + +TEST_CASE("BuildStore.Metadata") +{ + using namespace blockstore::testing; + + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> BlobHashes; + std::vector<IoBuffer> MetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + + for (size_t I = 0; I < 5; I++) + { + BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); + MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); + MetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(BlobHashes, MetaPayloads); + + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); + for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) + { + const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); + CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); + } + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); + for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) + { + const IoHash ExpectedHash = IoHash::HashBuffer(MetaPayloads[I]); + CHECK_EQ(IoHash::HashBuffer(ValidateMetaPayloads[I]), ExpectedHash); + } + for (const IoHash& BlobHash : BlobHashes) + { + CHECK(!Store.GetBlob(BlobHash)); + } + } + std::vector<IoHash> CompressedBlobsHashes; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + for (const auto& MetadataIt : MetadataPayloads) + { + CHECK(!MetadataIt); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + } + + std::vector<IoBuffer> BlobMetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK_EQ(IoHash::HashBuffer(MetadataPayload), IoHash::HashBuffer(BlobMetaPayloads[I])); + } + } + + { + GcManager Gc; + BuildStore Store(Config, Gc); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + + BlobMetaPayloads.clear(); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back( + MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK_EQ(IoHash::HashBuffer(DecompressedBlob), BlobHash); + } + } +} + +TEST_CASE("BuildStore.GC") +{ + using namespace blockstore::testing; + + ScopedTemporaryDirectory _; + + BuildStoreConfig Config; + Config.RootDirectory = _.Path() / "build_store"; + + std::vector<IoHash> CompressedBlobsHashes; + std::vector<IoBuffer> BlobMetaPayloads; + { + GcManager Gc; + BuildStore Store(Config, Gc); + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + Store.PutBlob(CompressedBlobsHashes.back(), Payload); + } + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); + } + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + } + { + GcManager Gc; + BuildStore Store(Config, Gc); + + { + GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), + .CollectSmallObjects = false, + .IsDeleteMode = false, + .Verbose = true}); + CHECK(!Result.WasCancelled); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(Blob); + IoBuffer DecompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(Blob)).Decompress().AsIoBuffer(); + CHECK(DecompressedBlob); + CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); + } + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(IoHash::HashBuffer(MetadataPayload) == IoHash::HashBuffer(BlobMetaPayloads[I])); + } + } + { + GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() + std::chrono::hours(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .Verbose = true}); + CHECK(!Result.WasCancelled); + for (const IoHash& BlobHash : CompressedBlobsHashes) + { + IoBuffer Blob = Store.GetBlob(BlobHash); + CHECK(!Blob); + } + + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); + for (size_t I = 0; I < MetadataPayloads.size(); I++) + { + const IoBuffer& MetadataPayload = MetadataPayloads[I]; + CHECK(!MetadataPayload); + } + } + } +} + +void +buildstore_forcelink() +{ +} + +#endif + +} // namespace zen diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 61552fafc..e4d962b56 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -200,21 +200,21 @@ namespace cache::impl { int DropIndex = 0; do { - if (!std::filesystem::exists(Dir)) + if (!IsDir(Dir)) { return false; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) + if (IsDir(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); + RenameDirectory(Dir, DroppedBucketPath, Ec); if (!Ec) { DeleteDirectories(DroppedBucketPath); @@ -909,16 +909,16 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, { std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); - if (std::filesystem::is_regular_file(LogPath)) + if (IsFile(LogPath)) { - if (!std::filesystem::remove(LogPath, Ec) || Ec) + if (!RemoveFile(LogPath, Ec) || Ec) { ZEN_WARN("snapshot failed to clean log file '{}', removing index at '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); std::error_code RemoveIndexEc; - std::filesystem::remove(IndexPath, RemoveIndexEc); + RemoveFile(IndexPath, RemoveIndexEc); } } } @@ -939,7 +939,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const { ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); - if (!std::filesystem::is_regular_file(IndexPath)) + if (!IsFile(IndexPath)) { return 0; } @@ -1023,7 +1023,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std:: { ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); - if (!std::filesystem::is_regular_file(LogPath)) + if (!IsFile(LogPath)) { return 0; } @@ -1103,37 +1103,37 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco if (IsNew) { - fs::remove(LogPath); - fs::remove(IndexPath); - fs::remove_all(m_BlocksBasePath); + RemoveFile(LogPath); + RemoveFile(IndexPath); + DeleteDirectories(m_BlocksBasePath); } CreateDirectories(m_BucketDir); m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); - if (std::filesystem::is_regular_file(IndexPath)) + if (IsFile(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); - std::filesystem::remove(IndexPath); + RemoveFile(IndexPath); } } uint64_t LogEntryCount = 0; - if (std::filesystem::is_regular_file(LogPath)) + if (IsFile(LogPath)) { if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) { LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition); } - else if (fs::is_regular_file(LogPath)) + else if (IsFile(LogPath)) { ZEN_WARN("removing invalid log at '{}'", LogPath); - std::filesystem::remove(LogPath); + RemoveFile(LogPath); } } @@ -2146,7 +2146,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); std::error_code Ec; - uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); + uintmax_t size = FileSizeFromPath(DataFilePath.ToPath(), Ec); if (Ec) { ReportBadKey(HashKey); @@ -2287,11 +2287,11 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) BuildPath(Path, Entry.Key); fs::path FilePath = Path.ToPath(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); - if (fs::is_regular_file(FilePath)) + if (IsFile(FilePath)) { ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; - fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... + RemoveFile(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } } } @@ -2424,7 +2424,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c if (CleanUpTempFile) { std::error_code Ec; - std::filesystem::remove(DataFile.GetPath(), Ec); + RemoveFile(DataFile.GetPath(), Ec); if (Ec) { ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", @@ -2452,7 +2452,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); // We do a speculative remove of the file instead of probing with a exists call and check the error code instead - std::filesystem::remove(FsPath, Ec); + RemoveFile(FsPath, Ec); if (Ec) { if (Ec.value() != ENOENT) @@ -2460,7 +2460,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); - std::filesystem::remove(FsPath, Ec); + RemoveFile(FsPath, Ec); if (Ec && Ec.value() != ENOENT) { throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); @@ -2731,9 +2731,10 @@ class DiskBucketStoreCompactor : public GcStoreCompactor using CacheBucket = ZenCacheDiskLayer::CacheBucket; public: - DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys) + DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys, bool FlushBucket) : m_Bucket(Bucket) , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) + , m_FlushBucket(FlushBucket) { m_ExpiredStandaloneKeys.shrink_to_fit(); } @@ -2791,7 +2792,7 @@ public: ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; - if (!fs::remove(FilePath, Ec)) + if (!RemoveFile(FilePath, Ec)) { continue; } @@ -2812,7 +2813,7 @@ public: ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; - bool Existed = std::filesystem::is_regular_file(FilePath, Ec); + bool Existed = IsFile(FilePath, Ec); if (Ec) { ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", @@ -2960,6 +2961,10 @@ public: } } } + if (m_FlushBucket) + { + m_Bucket.Flush(); + } } virtual std::string GetGcName(GcCtx& Ctx) override { return m_Bucket.GetGcName(Ctx); } @@ -2967,6 +2972,7 @@ public: private: ZenCacheDiskLayer::CacheBucket& m_Bucket; std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys; + bool m_FlushBucket = false; }; GcStoreCompactor* @@ -2990,24 +2996,6 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } - if (Stats.DeletedCount > 0) - { - bool Expected = false; - if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) - { - return; - } - auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - - try - { - SaveSnapshot([]() { return 0; }); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed to write index and manifest after RemoveExpiredData in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); - } - } }); const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); @@ -3094,7 +3082,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) return nullptr; } - return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); + return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys), /*FlushBucket*/ Stats.DeletedCount > 0); } bool diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 73c10a6db..ed42f254e 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -118,7 +118,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig) // Ensure root directory exists - create if it doesn't exist already - std::filesystem::create_directories(m_Config.RootDirectory); + CreateDirectories(m_Config.RootDirectory); // Open or create manifest diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp index 6c7b1b297..492ce9317 100644 --- a/src/zenstore/caslog.cpp +++ b/src/zenstore/caslog.cpp @@ -37,7 +37,7 @@ CasLogFile::~CasLogFile() bool CasLogFile::IsValid(std::filesystem::path FileName, size_t RecordSize) { - if (!std::filesystem::is_regular_file(FileName)) + if (!IsFile(FileName)) { return false; } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 2be0542db..184251da7 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -226,7 +226,7 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) } std::vector<CasStore::InsertResult> -CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes) +CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes) { ZEN_MEMSCOPE(GetCasContainerTag()); @@ -323,7 +323,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) } bool -CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, +CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) @@ -927,10 +927,10 @@ CasContainerStrategy::MakeIndexSnapshot() fs::path TempIndexPath = cas::impl::GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(TempIndexPath)) + if (IsFile(TempIndexPath)) { std::error_code Ec; - if (!fs::remove(TempIndexPath, Ec) || Ec) + if (!RemoveFile(TempIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", TempIndexPath, Ec.message()); return; @@ -939,9 +939,9 @@ CasContainerStrategy::MakeIndexSnapshot() try { - if (fs::is_regular_file(IndexPath)) + if (IsFile(IndexPath)) { - fs::rename(IndexPath, TempIndexPath); + RenameFile(IndexPath, TempIndexPath); } // Write the current state of the location map to a new index state @@ -992,21 +992,21 @@ CasContainerStrategy::MakeIndexSnapshot() // Restore any previous snapshot - if (fs::is_regular_file(TempIndexPath)) + if (IsFile(TempIndexPath)) { std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(TempIndexPath, IndexPath, Ec); + RemoveFile(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless + RenameFile(TempIndexPath, IndexPath, Ec); if (Ec) { ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", TempIndexPath, Ec.message()); } } } - if (fs::is_regular_file(TempIndexPath)) + if (IsFile(TempIndexPath)) { std::error_code Ec; - if (!fs::remove(TempIndexPath, Ec) || Ec) + if (!RemoveFile(TempIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempIndexPath, Ec.message()); } @@ -1092,7 +1092,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski if (!TCasLogFile<CasDiskIndexEntry>::IsValid(LogPath)) { ZEN_WARN("removing invalid cas log at '{}'", LogPath); - std::filesystem::remove(LogPath); + RemoveFile(LogPath); return 0; } @@ -1155,7 +1155,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) if (IsNewStore) { - std::filesystem::remove_all(BasePath); + DeleteDirectories(BasePath); } CreateDirectories(BasePath); @@ -1165,19 +1165,19 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName); std::filesystem::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName); - if (std::filesystem::is_regular_file(IndexPath)) + if (IsFile(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); - std::filesystem::remove(IndexPath); + RemoveFile(IndexPath); } } uint64_t LogEntryCount = 0; - if (std::filesystem::is_regular_file(LogPath)) + if (IsFile(LogPath)) { if (TCasLogFile<CasDiskIndexEntry>::IsValid(LogPath)) { @@ -1186,7 +1186,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) else { ZEN_WARN("removing invalid cas log at '{}'", LogPath); - std::filesystem::remove(LogPath); + RemoveFile(LogPath); } } diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 07e620086..2eb4c233a 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -52,11 +52,11 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore ~CasContainerStrategy(); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); - std::vector<CasStore::InsertResult> InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes); + std::vector<CasStore::InsertResult> InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(HashKeySet& InOutChunks); - bool IterateChunks(std::span<IoHash> ChunkHashes, + bool IterateChunks(std::span<const IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit); diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 34db51aa9..14bdc41f0 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -176,10 +176,10 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN if (IsNewStore) { - std::filesystem::remove(LogPath); - std::filesystem::remove(IndexPath); + RemoveFile(LogPath); + RemoveFile(IndexPath); - if (std::filesystem::is_directory(m_RootDirectory)) + if (IsDir(m_RootDirectory)) { // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside // in this folder as well @@ -211,24 +211,24 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots) { - std::filesystem::remove_all(SharededRoot); + DeleteDirectories(SharededRoot); } } } - if (std::filesystem::is_regular_file(IndexPath)) + if (IsFile(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); - std::filesystem::remove(IndexPath); + RemoveFile(IndexPath); } } uint64_t LogEntryCount = 0; - if (std::filesystem::is_regular_file(LogPath)) + if (IsFile(LogPath)) { if (TCasLogFile<FileCasIndexEntry>::IsValid(LogPath)) { @@ -237,7 +237,7 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN else { ZEN_WARN("removing invalid cas log at '{}'", LogPath); - std::filesystem::remove(LogPath); + RemoveFile(LogPath); } } @@ -327,7 +327,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: { std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); std::error_code Ec; - std::filesystem::rename(ChunkPath, TempPath, Ec); + RenameFile(ChunkPath, TempPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("unable to move existing CAS file {} to {}", ChunkPath, TempPath)); @@ -452,7 +452,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: { PayloadFile.Close(); std::error_code DummyEc; - std::filesystem::remove(ChunkPath, DummyEc); + RemoveFile(ChunkPath, DummyEc); throw; } bool IsNew = UpdateIndex(ChunkHash, Chunk.Size()); @@ -503,7 +503,7 @@ FileCasStrategy::SafeOpenChunk(const IoHash& ChunkHash, uint64 ExpectedSize) { std::error_code Ec; std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); - std::filesystem::rename(ChunkPath, TempPath, Ec); + RenameFile(ChunkPath, TempPath, Ec); if (!Ec) { Chunk.SetDeleteOnClose(true); @@ -574,7 +574,7 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) ShardingHelper Name(m_RootDirectory, ChunkHash); const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); - uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(ChunkPath, Ec)); + uint64_t FileSize = static_cast<uint64_t>(FileSizeFromPath(ChunkPath, Ec)); if (Ec) { ZEN_WARN("get file size FAILED, file cas '{}'", ChunkPath); @@ -582,9 +582,9 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) } ZEN_DEBUG("deleting CAS payload file '{}' {}", ChunkPath, NiceBytes(FileSize)); - std::filesystem::remove(ChunkPath, Ec); + RemoveFile(ChunkPath, Ec); - if (!Ec || !std::filesystem::exists(ChunkPath)) + if (!Ec || !IsFile(ChunkPath)) { { RwLock::ExclusiveLockScope _(m_Lock); @@ -941,10 +941,10 @@ FileCasStrategy::MakeIndexSnapshot() fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory); // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) + if (IsFile(STmpIndexPath)) { std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) + if (!RemoveFile(STmpIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message()); return; @@ -953,9 +953,9 @@ FileCasStrategy::MakeIndexSnapshot() try { - if (fs::is_regular_file(IndexPath)) + if (IsFile(IndexPath)) { - fs::rename(IndexPath, STmpIndexPath); + RenameFile(IndexPath, STmpIndexPath); } // Write the current state of the location map to a new index state @@ -1004,21 +1004,21 @@ FileCasStrategy::MakeIndexSnapshot() // Restore any previous snapshot - if (fs::is_regular_file(STmpIndexPath)) + if (IsFile(STmpIndexPath)) { std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(STmpIndexPath, IndexPath, Ec); + RemoveFile(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless + RenameFile(STmpIndexPath, IndexPath, Ec); if (Ec) { ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message()); } } } - if (fs::is_regular_file(STmpIndexPath)) + if (IsFile(STmpIndexPath)) { std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) + if (!RemoveFile(STmpIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message()); } @@ -1032,7 +1032,7 @@ FileCasStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& using namespace filecas::impl; std::vector<FileCasIndexEntry> Entries; - if (std::filesystem::is_regular_file(IndexPath)) + if (IsFile(IndexPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -1077,7 +1077,7 @@ FileCasStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& return 0; } - if (std::filesystem::is_directory(m_RootDirectory)) + if (IsDir(m_RootDirectory)) { ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory); TCasLogFile<FileCasIndexEntry> CasLog; @@ -1116,7 +1116,7 @@ FileCasStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntr using namespace filecas::impl; - if (std::filesystem::is_regular_file(LogPath)) + if (IsFile(LogPath)) { uint64_t LogEntryCount = 0; Stopwatch Timer; @@ -1274,12 +1274,12 @@ public: ChunkPath); } std::error_code Ec; - uint64_t SizeOnDisk = std::filesystem::file_size(ChunkPath, Ec); + uint64_t SizeOnDisk = FileSizeFromPath(ChunkPath, Ec); if (Ec) { SizeOnDisk = 0; } - bool Existed = std::filesystem::remove(ChunkPath, Ec); + bool Existed = RemoveFile(ChunkPath, Ec); if (Ec) { // Target file may be open for read, attempt to move it to a temp file and mark it delete on close @@ -1290,7 +1290,7 @@ public: if (OldChunk) { std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); - std::filesystem::rename(ChunkPath, TempPath, Ec); + RenameFile(ChunkPath, TempPath, Ec); if (!Ec) { OldChunk.SetDeleteOnClose(true); @@ -1317,7 +1317,7 @@ public: else { std::error_code Ec; - bool Existed = std::filesystem::is_regular_file(ChunkPath, Ec); + bool Existed = IsFile(ChunkPath, Ec); if (Ec) { if (Ctx.Settings.Verbose) @@ -1516,7 +1516,7 @@ TEST_CASE("cas.chunk.moveoverwrite") Payload1.SetDeleteOnClose(true); CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, CompressedPayload1.DecodeRawHash()); CHECK_EQ(Result.New, true); - CHECK(!std::filesystem::exists(Payload1Path)); + CHECK(!IsFile(Payload1Path)); } { std::filesystem::path Payload1BPath{TempDir.Path() / "payload_1"}; @@ -1526,9 +1526,9 @@ TEST_CASE("cas.chunk.moveoverwrite") CasStore::InsertResult Result = FileCas.InsertChunk(Payload1B, CompressedPayload1.DecodeRawHash()); CHECK_EQ(Result.New, false); - CHECK(std::filesystem::exists(Payload1BPath)); + CHECK(IsFile(Payload1BPath)); Payload1B = {}; - CHECK(!std::filesystem::exists(Payload1BPath)); + CHECK(!IsFile(Payload1BPath)); } IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); @@ -1554,7 +1554,7 @@ TEST_CASE("cas.chunk.moveoverwrite") } Payload2 = {}; - CHECK(!std::filesystem::exists(Payload2Path)); + CHECK(!IsFile(Payload2Path)); { IoHash RawHash; @@ -1598,9 +1598,9 @@ TEST_CASE("cas.chunk.copyoverwrite") CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, CompressedPayload1.DecodeRawHash(), CasStore::InsertMode::kCopyOnly); CHECK_EQ(Result.New, true); - CHECK(std::filesystem::exists(Payload1Path)); + CHECK(IsFile(Payload1Path)); Payload1 = {}; - CHECK(!std::filesystem::exists(Payload1Path)); + CHECK(!IsFile(Payload1Path)); } { std::filesystem::path Payload1BPath{TempDir.Path() / "payload_1"}; @@ -1611,9 +1611,9 @@ TEST_CASE("cas.chunk.copyoverwrite") CasStore::InsertResult Result = FileCas.InsertChunk(Payload1B, CompressedPayload1.DecodeRawHash(), CasStore::InsertMode::kCopyOnly); CHECK_EQ(Result.New, false); - CHECK(std::filesystem::exists(Payload1BPath)); + CHECK(IsFile(Payload1BPath)); Payload1B = {}; - CHECK(!std::filesystem::exists(Payload1BPath)); + CHECK(!IsFile(Payload1BPath)); } IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); @@ -1640,7 +1640,7 @@ TEST_CASE("cas.chunk.copyoverwrite") } Payload2 = {}; - CHECK(!std::filesystem::exists(Payload2Path)); + CHECK(!IsFile(Payload2Path)); { IoHash RawHash; diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 7ac10d613..ac4dda83f 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -62,11 +62,11 @@ namespace { { if (Size == 0) { - std::filesystem::remove(Path); + RemoveFile(Path); return std::error_code{}; } CreateDirectories(Path.parent_path()); - if (std::filesystem::is_regular_file(Path) && std::filesystem::file_size(Path) == Size) + if (IsFile(Path) && FileSizeFromPath(Path) == Size) { return std::error_code(); } @@ -1081,7 +1081,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); { ZEN_TRACE_CPU("GcV2::LockReferencers"); - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore/BuildStore) until // we delete the ReferenceLockers Latch WorkLeft(1); { @@ -1108,7 +1108,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_TRACE_CPU("GcV2::UpdateLockedState"); // Locking all references checkers so we have a steady state of which references are used - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore/BuildStore) until // we delete the ReferenceCheckers Latch WorkLeft(1); @@ -1262,12 +1262,12 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_TRACE_CPU("GcV2::CompactStores"); auto ClaimDiskReserve = [&]() -> uint64_t { - if (!std::filesystem::is_regular_file(Settings.DiskReservePath)) + if (!IsFile(Settings.DiskReservePath)) { return 0; } - uint64_t ReclaimedSize = std::filesystem::file_size(Settings.DiskReservePath); - if (std::filesystem::remove(Settings.DiskReservePath)) + uint64_t ReclaimedSize = FileSizeFromPath(Settings.DiskReservePath); + if (RemoveFile(Settings.DiskReservePath)) { return ReclaimedSize; } @@ -1557,7 +1557,7 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) m_Config.LightweightInterval = m_Config.MonitorInterval; } - std::filesystem::create_directories(Config.RootDirectory); + CreateDirectories(Config.RootDirectory); std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); if (Ec) @@ -1739,6 +1739,7 @@ GcScheduler::AppendGCLog(std::string_view Id, GcClock::TimePoint StartTime, cons { Writer << "CacheExpireTime"sv << ToDateTime(Settings.CacheExpireTime); Writer << "ProjectStoreExpireTime"sv << ToDateTime(Settings.ProjectStoreExpireTime); + Writer << "BuildStoreExpireTime"sv << ToDateTime(Settings.BuildStoreExpireTime); Writer << "CollectSmallObjects"sv << Settings.CollectSmallObjects; Writer << "IsDeleteMode"sv << Settings.IsDeleteMode; Writer << "SkipCidDelete"sv << Settings.SkipCidDelete; @@ -1849,7 +1850,7 @@ GcScheduler::GetState() const if (Result.Config.DiskReserveSize != 0) { Ec.clear(); - Result.HasDiskReserve = std::filesystem::is_regular_file(Result.Config.RootDirectory / "reserve.gc", Ec) && !Ec; + Result.HasDiskReserve = IsFile(Result.Config.RootDirectory / "reserve.gc", Ec) && !Ec; } if (Result.Status != GcSchedulerStatus::kRunning) @@ -1940,6 +1941,7 @@ GcScheduler::SchedulerThread() std::chrono::seconds LightweightGcInterval = m_Config.LightweightInterval; std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration; std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration; + std::chrono::seconds MaxBuildStoreDuration = m_Config.MaxBuildStoreDuration; uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; bool SkipCid = false; GcVersion UseGCVersion = m_Config.UseGCVersion; @@ -1975,6 +1977,10 @@ GcScheduler::SchedulerThread() { MaxProjectStoreDuration = TriggerParams.MaxProjectStoreDuration; } + if (TriggerParams.MaxBuildStoreDuration != std::chrono::seconds::max()) + { + MaxBuildStoreDuration = TriggerParams.MaxBuildStoreDuration; + } if (TriggerParams.DiskSizeSoftLimit != 0) { DiskSizeSoftLimit = TriggerParams.DiskSizeSoftLimit; @@ -2046,6 +2052,8 @@ GcScheduler::SchedulerThread() MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; GcClock::TimePoint ProjectStoreExpireTime = MaxProjectStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxProjectStoreDuration; + GcClock::TimePoint BuildStoreExpireTime = + MaxBuildStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxBuildStoreDuration; const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); @@ -2102,6 +2110,10 @@ GcScheduler::SchedulerThread() { ProjectStoreExpireTime = SizeBasedExpireTime; } + if (SizeBasedExpireTime > BuildStoreExpireTime) + { + BuildStoreExpireTime = SizeBasedExpireTime; + } } std::chrono::seconds RemainingTimeUntilGc = @@ -2227,6 +2239,7 @@ GcScheduler::SchedulerThread() bool GcSuccess = CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, + BuildStoreExpireTime, DoDelete, CollectSmallObjects, SkipCid, @@ -2333,6 +2346,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time bool GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, + const GcClock::TimePoint& BuildStoreExpireTime, bool Delete, bool CollectSmallObjects, bool SkipCid, @@ -2375,12 +2389,12 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, { // We are low on disk, check if we can release our extra storage reserve, if we can't bail from doing GC auto ClaimDiskReserve = [&]() -> uint64_t { - if (!std::filesystem::is_regular_file(DiskReservePath)) + if (!IsFile(DiskReservePath)) { return 0; } - uint64_t ReclaimedSize = std::filesystem::file_size(DiskReservePath); - if (std::filesystem::remove(DiskReservePath)) + uint64_t ReclaimedSize = FileSizeFromPath(DiskReservePath); + if (RemoveFile(DiskReservePath)) { return ReclaimedSize; } @@ -2416,6 +2430,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcSettings Settings = {.CacheExpireTime = CacheExpireTime, .ProjectStoreExpireTime = ProjectStoreExpireTime, + .BuildStoreExpireTime = BuildStoreExpireTime, .CollectSmallObjects = CollectSmallObjects, .IsDeleteMode = Delete, .SkipCidDelete = SkipCid, @@ -2447,6 +2462,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, } SB.Append(fmt::format(" Cache cutoff time: {}\n", Settings.CacheExpireTime)); SB.Append(fmt::format(" Project store cutoff time: {}\n", Settings.ProjectStoreExpireTime)); + SB.Append(fmt::format(" Build store cutoff time: {}\n", Settings.BuildStoreExpireTime)); }; { @@ -2552,6 +2568,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, if (Delete) { GcClock::TimePoint KeepRangeStart = Min(CacheExpireTime, ProjectStoreExpireTime); + KeepRangeStart = Min(KeepRangeStart, BuildStoreExpireTime); m_LastGcExpireTime = KeepRangeStart; std::unique_lock Lock(m_GcMutex); m_DiskUsageWindow.KeepRange(KeepRangeStart.time_since_epoch().count(), GcClock::Duration::max().count()); diff --git a/src/zenstore/include/zenstore/accesstime.h b/src/zenstore/include/zenstore/accesstime.h new file mode 100644 index 000000000..a28dc908b --- /dev/null +++ b/src/zenstore/include/zenstore/accesstime.h @@ -0,0 +1,47 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenstore/gc.h> + +#include <gsl/gsl-lite.hpp> + +namespace zen { + +// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch +struct AccessTime +{ + explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} + AccessTime& operator=(GcClock::Tick Tick) noexcept + { + SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); + return *this; + } + operator GcClock::Tick() const noexcept + { + return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) + .count(); + } + + AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime& operator=(AccessTime&& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + AccessTime& operator=(const AccessTime& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + +private: + static uint32_t ToSeconds(GcClock::Tick Tick) + { + return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); + } + std::atomic_uint32_t SecondsSinceEpoch; +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 97357e5cb..0c72a13aa 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -156,7 +156,7 @@ public: void WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback); typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback; - void WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback); + void WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback); IoBuffer TryGetChunk(const BlockStoreLocation& Location) const; void Flush(bool ForceNewBlock); diff --git a/src/zenstore/include/zenstore/buildstore/buildstore.h b/src/zenstore/include/zenstore/buildstore/buildstore.h new file mode 100644 index 000000000..302af5f9c --- /dev/null +++ b/src/zenstore/include/zenstore/buildstore/buildstore.h @@ -0,0 +1,186 @@ + +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/blockstore.h> + +#include <zencore/iohash.h> +#include <zenstore/accesstime.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> +#include "../compactcas.h" +#include "../filecas.h" + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +struct BuildStoreConfig +{ + std::filesystem::path RootDirectory; + uint32_t SmallBlobBlockStoreMaxBlockSize = 256 * 1024 * 1024; + uint64_t SmallBlobBlockStoreMaxBlockEmbedSize = 1 * 1024 * 1024; + uint32_t SmallBlobBlockStoreAlignement = 16; + uint32_t MetadataBlockStoreMaxBlockSize = 64 * 1024 * 1024; + uint32_t MetadataBlockStoreAlignement = 8; +}; + +class BuildStore : public GcReferencer, public GcReferenceLocker //, public GcStorage +{ +public: + explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc); + virtual ~BuildStore(); + + void PutBlob(const IoHash& BlobHashes, const IoBuffer& Payload); + IoBuffer GetBlob(const IoHash& BlobHashes); + + struct BlobExistsResult + { + bool HasBody = 0; + bool HasMetadata = 0; + }; + + std::vector<BlobExistsResult> BlobsExists(std::span<const IoHash> BlobHashes); + + void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas); + std::vector<IoBuffer> GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool); + + void Flush(); + +private: + void CompactState(); + + uint64_t ReadPayloadLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount); + uint64_t ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount); + + //////// GcReferencer + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; + + //////// GcReferenceLocker + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; + +#pragma pack(push) +#pragma pack(1) + struct PayloadEntry + { + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value + static const uint8_t kStandalone = 0x20u; // This payload is stored as a standalone value + + uint8_t Flags = 0; + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + uint8_t Reserved3 = 0; + }; + static_assert(sizeof(PayloadEntry) == 4); + + struct PayloadDiskEntry + { + PayloadEntry Entry; // 4 bytes + IoHash BlobHash; // 20 bytes + }; + static_assert(sizeof(PayloadDiskEntry) == 24); + + struct MetadataEntry + { + BlockStoreLocation Location; // 12 bytes + + ZenContentType ContentType = ZenContentType::kCOUNT; // 1 byte + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value + uint8_t Flags = 0; // 1 byte + + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + }; + static_assert(sizeof(MetadataEntry) == 16); + + struct MetadataDiskEntry + { + MetadataEntry Entry; // 16 bytes + IoHash BlobHash; // 20 bytes + uint8_t Reserved1 = 0; + uint8_t Reserved2 = 0; + uint8_t Reserved3 = 0; + uint8_t Reserved4 = 0; + }; + static_assert(sizeof(MetadataDiskEntry) == 40); + +#pragma pack(pop) + + static bool ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason); + static bool ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::string& OutReason); + + struct PayloadIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + PayloadIndex() = default; + explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const PayloadIndex& Other) const = default; + }; + + struct MetadataIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + MetadataIndex() = default; + explicit MetadataIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const MetadataIndex& Other) const = default; + }; + + struct BlobIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + BlobIndex() = default; + explicit BlobIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const BlobIndex& Other) const = default; + }; + + struct BlobEntry + { + PayloadIndex Payload; + MetadataIndex Metadata; + AccessTime LastAccessTime; + }; + static_assert(sizeof(BlobEntry) == 12); + + const BuildStoreConfig m_Config; + GcManager& m_Gc; + + RwLock m_Lock; + + std::vector<PayloadEntry> m_PayloadEntries; + std::vector<MetadataEntry> m_MetadataEntries; + + std::vector<BlobEntry> m_BlobEntries; + tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> m_BlobLookup; + + FileCasStrategy m_LargeBlobStore; + CasContainerStrategy m_SmallBlobStore; + BlockStore m_MetadataBlockStore; + + TCasLogFile<PayloadDiskEntry> m_PayloadlogFile; + TCasLogFile<MetadataDiskEntry> m_MetadatalogFile; + uint64_t m_BlobLogFlushPosition = 0; + uint64_t m_MetaLogFlushPosition = 0; + + std::unique_ptr<HashSet> m_TrackedCacheKeys; + + friend class BuildStoreGcReferenceChecker; + friend class BuildStoreGcReferencePruner; + friend class BuildStoreGcCompator; +}; + +void buildstore_forcelink(); + +} // namespace zen diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 05400c784..5a51718d3 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -5,6 +5,7 @@ #include "cacheshared.h" #include <zencore/stats.h> +#include <zenstore/accesstime.h> #include <zenstore/blockstore.h> #include <zenstore/caslog.h> diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index 521c78bb1..ef1b803de 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -72,42 +72,4 @@ struct CacheContentStats bool IsKnownBadBucketName(std::string_view BucketName); bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer); -////////////////////////////////////////////////////////////////////////// - -// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch -struct AccessTime -{ - explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} - AccessTime& operator=(GcClock::Tick Tick) noexcept - { - SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); - return *this; - } - operator GcClock::Tick() const noexcept - { - return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) - .count(); - } - - AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} - AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} - AccessTime& operator=(AccessTime&& Rhs) noexcept - { - SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - AccessTime& operator=(const AccessTime& Rhs) noexcept - { - SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - -private: - static uint32_t ToSeconds(GcClock::Tick Tick) - { - return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); - } - std::atomic_uint32_t SecondsSinceEpoch; -}; - } // namespace zen diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 3daae0a93..67aadef71 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -55,6 +55,7 @@ struct GcSettings { GcClock::TimePoint CacheExpireTime = GcClock::Now(); GcClock::TimePoint ProjectStoreExpireTime = GcClock::Now(); + GcClock::TimePoint BuildStoreExpireTime = GcClock::Now(); bool CollectSmallObjects = false; bool IsDeleteMode = false; bool SkipCidDelete = false; @@ -412,6 +413,7 @@ struct GcSchedulerConfig std::chrono::seconds Interval{}; std::chrono::seconds MaxCacheDuration{86400}; std::chrono::seconds MaxProjectStoreDuration{604800}; + std::chrono::seconds MaxBuildStoreDuration{604800}; bool CollectSmallObjects = true; bool Enabled = true; uint64_t DiskReserveSize = 1ul << 28; @@ -496,6 +498,7 @@ public: bool CollectSmallObjects = false; std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); std::chrono::seconds MaxProjectStoreDuration = std::chrono::seconds::max(); + std::chrono::seconds MaxBuildStoreDuration = std::chrono::seconds::max(); uint64_t DiskSizeSoftLimit = 0; bool SkipCid = false; bool SkipDelete = false; @@ -528,6 +531,7 @@ private: void SchedulerThread(); bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, + const GcClock::TimePoint& BuildStoreExpireTime, bool Delete, bool CollectSmallObjects, bool SkipCid, diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp index 02a83d2a6..0ca2adab2 100644 --- a/src/zenstore/workspaces.cpp +++ b/src/zenstore/workspaces.cpp @@ -444,7 +444,7 @@ Workspaces::RefreshWorkspaceShares(const Oid& WorkspaceId) { const std::filesystem::path& RootPath = Workspace->GetConfig().RootPath; std::filesystem::path ConfigPath = RootPath / WorkspaceConfigName; - if (std::filesystem::exists(ConfigPath)) + if (IsFile(ConfigPath)) { std::string Error; std::vector<Workspaces::WorkspaceShareConfiguration> WorkspaceShares = ReadWorkspaceConfig(m_Log, RootPath, Error); @@ -458,7 +458,7 @@ Workspaces::RefreshWorkspaceShares(const Oid& WorkspaceId) { const std::filesystem::path& SharePath = Configuration.SharePath; - if (std::filesystem::is_directory(RootPath / SharePath)) + if (IsDir(RootPath / SharePath)) { DeletedShares.erase(Configuration.Id); @@ -808,7 +808,7 @@ Workspaces::ReadConfig(const LoggerRef& InLog, const std::filesystem::path& Work ZEN_DEBUG("Reading workspaces state from {}", WorkspaceStatePath); const std::filesystem::path ConfigPath = WorkspaceStatePath / WorkspacesConfigName; - if (std::filesystem::exists(ConfigPath)) + if (IsFile(ConfigPath)) { std::vector<Workspaces::WorkspaceConfiguration> Workspaces = WorkspacesFromJson(IoBufferBuilder::MakeFromFile(ConfigPath), OutError); @@ -847,7 +847,7 @@ Workspaces::ReadWorkspaceConfig(const LoggerRef& InLog, const std::filesystem::p ZEN_DEBUG("Reading workspace state from {}", WorkspaceRoot); std::filesystem::path ConfigPath = WorkspaceRoot / WorkspaceConfigName; - if (std::filesystem::exists(ConfigPath)) + if (IsFile(ConfigPath)) { std::vector<Workspaces::WorkspaceShareConfiguration> WorkspaceShares = WorkspaceSharesFromJson(IoBufferBuilder::MakeFromFile(ConfigPath), OutError); @@ -886,7 +886,7 @@ Workspaces::AddWorkspace(const LoggerRef& Log, const std::filesystem::path& Work { throw std::invalid_argument(fmt::format("invalid root path '{}' for workspace {}", Configuration.RootPath, Configuration.Id)); } - if (!std::filesystem::is_directory(Configuration.RootPath)) + if (!IsDir(Configuration.RootPath)) { throw std::invalid_argument( fmt::format("workspace root path '{}' does not exist for workspace '{}'", Configuration.RootPath, Configuration.Id)); @@ -965,7 +965,7 @@ Workspaces::AddWorkspaceShare(const LoggerRef& Log, throw std::invalid_argument( fmt::format("workspace share path '{}' is not a sub-path of workspace path '{}'", Configuration.SharePath, WorkspaceRoot)); } - if (!std::filesystem::is_directory(WorkspaceRoot / Configuration.SharePath)) + if (!IsDir(WorkspaceRoot / Configuration.SharePath)) { throw std::invalid_argument( fmt::format("workspace share path '{}' does not exist in workspace path '{}'", Configuration.SharePath, WorkspaceRoot)); @@ -1244,7 +1244,7 @@ Workspaces::FindWorkspaceShare(const Oid& WorkspaceId, const Oid& ShareId, bool const Workspaces::WorkspaceConfiguration& WorkspaceConfig = Workspace->GetConfig(); const Workspaces::WorkspaceShareConfiguration& ShareConfig = Share->GetConfig(); std::filesystem::path FullSharePath = WorkspaceConfig.RootPath / ShareConfig.SharePath; - if (std::filesystem::is_directory(FullSharePath)) + if (IsDir(FullSharePath)) { if (ForceRefresh || !Share->IsInitialized()) { @@ -1306,18 +1306,18 @@ namespace { std::filesystem::path EmptyFolder(RootPath / "empty_folder"); std::filesystem::path FirstFolder(RootPath / "first_folder"); - std::filesystem::create_directory(FirstFolder); + CreateDirectories(FirstFolder); Result.push_back(std::make_pair(FirstFolder / "first_folder_blob1.bin", CreateRandomBlob(22))); Result.push_back(std::make_pair(FirstFolder / "first_folder_blob2.bin", CreateRandomBlob(122))); std::filesystem::path SecondFolder(RootPath / "second_folder"); - std::filesystem::create_directory(SecondFolder); + CreateDirectories(SecondFolder); Result.push_back(std::make_pair(SecondFolder / "second_folder_blob1.bin", CreateRandomBlob(522))); Result.push_back(std::make_pair(SecondFolder / "second_folder_blob2.bin", CreateRandomBlob(122))); Result.push_back(std::make_pair(SecondFolder / "second_folder_blob3.bin", CreateRandomBlob(225))); std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second"); - std::filesystem::create_directory(SecondFolderChild); + CreateDirectories(SecondFolderChild); Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob1.bin", CreateRandomBlob(622))); for (const auto& It : Result) @@ -1365,13 +1365,13 @@ TEST_CASE("workspaces.scanfolder") Structure->IterateEntries([&](const Oid& Id, const FolderStructure::FileEntry& Entry) { std::filesystem::path AbsPath = RootPath / Entry.RelativePath; - CHECK(std::filesystem::is_regular_file(AbsPath)); - CHECK(std::filesystem::file_size(AbsPath) == Entry.Size); + CHECK(IsFile(AbsPath)); + CHECK(FileSizeFromPath(AbsPath) == Entry.Size); const FolderStructure::FileEntry* FindEntry = Structure->FindEntry(Id); CHECK(FindEntry); std::filesystem::path Path = RootPath / FindEntry->RelativePath; CHECK(AbsPath == Path); - CHECK(std::filesystem::file_size(AbsPath) == FindEntry->Size); + CHECK(FileSizeFromPath(AbsPath) == FindEntry->Size); }); } |