aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenstore
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/blockstore.cpp1312
-rw-r--r--src/zenstore/cas.cpp355
-rw-r--r--src/zenstore/cas.h67
-rw-r--r--src/zenstore/caslog.cpp236
-rw-r--r--src/zenstore/cidstore.cpp125
-rw-r--r--src/zenstore/compactcas.cpp1511
-rw-r--r--src/zenstore/compactcas.h95
-rw-r--r--src/zenstore/filecas.cpp1452
-rw-r--r--src/zenstore/filecas.h102
-rw-r--r--src/zenstore/gc.cpp1312
-rw-r--r--src/zenstore/hashkeyset.cpp60
-rw-r--r--src/zenstore/include/zenstore/blockstore.h175
-rw-r--r--src/zenstore/include/zenstore/caslog.h91
-rw-r--r--src/zenstore/include/zenstore/cidstore.h87
-rw-r--r--src/zenstore/include/zenstore/gc.h242
-rw-r--r--src/zenstore/include/zenstore/hashkeyset.h54
-rw-r--r--src/zenstore/include/zenstore/scrubcontext.h41
-rw-r--r--src/zenstore/include/zenstore/zenstore.h13
-rw-r--r--src/zenstore/xmake.lua9
-rw-r--r--src/zenstore/zenstore.cpp32
20 files changed, 7371 insertions, 0 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
new file mode 100644
index 000000000..5dfa10c91
--- /dev/null
+++ b/src/zenstore/blockstore.cpp
@@ -0,0 +1,1312 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/blockstore.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+
+#include <algorithm>
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <random>
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath)
+{
+}
+
+BlockStoreFile::~BlockStoreFile()
+{
+ m_IoBuffer = IoBuffer();
+ m_File.Detach();
+}
+
+const std::filesystem::path&
+BlockStoreFile::GetPath() const
+{
+ return m_Path;
+}
+
+void
+BlockStoreFile::Open()
+{
+ m_File.Open(m_Path, BasicFile::Mode::kDelete);
+ void* FileHandle = m_File.Handle();
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize());
+}
+
+void
+BlockStoreFile::Create(uint64_t InitialSize)
+{
+ auto ParentPath = m_Path.parent_path();
+ if (!std::filesystem::is_directory(ParentPath))
+ {
+ CreateDirectories(ParentPath);
+ }
+
+ m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete);
+ void* FileHandle = m_File.Handle();
+
+ // We map our m_IoBuffer beyond the file size as we will grow it over time and want
+ // to be able to create sub-buffers of all the written range later
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize);
+}
+
+uint64_t
+BlockStoreFile::FileSize()
+{
+ return m_File.FileSize();
+}
+
+void
+BlockStoreFile::MarkAsDeleteOnClose()
+{
+ m_IoBuffer.MarkAsDeleteOnClose();
+}
+
+IoBuffer
+BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size)
+{
+ return IoBuffer(m_IoBuffer, Offset, Size);
+}
+
+void
+BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset)
+{
+ m_File.Read(Data, Size, FileOffset);
+}
+
+void
+BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
+{
+ m_File.Write(Data, Size, FileOffset);
+}
+
+void
+BlockStoreFile::Flush()
+{
+ m_File.Flush();
+}
+
+BasicFile&
+BlockStoreFile::GetBasicFile()
+{
+ return m_File;
+}
+
+void
+BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
+{
+ m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
+}
+
+constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024;
+
+void
+BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations)
+{
+ ZEN_ASSERT(MaxBlockSize > 0);
+ ZEN_ASSERT(MaxBlockCount > 0);
+ ZEN_ASSERT(IsPow2(MaxBlockCount));
+
+ m_TotalSize = 0;
+ m_BlocksBasePath = BlocksBasePath;
+ m_MaxBlockSize = MaxBlockSize;
+
+ m_ChunkBlocks.clear();
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : KnownLocations)
+ {
+ KnownBlocks.insert(Entry.BlockIndex);
+ }
+
+ if (std::filesystem::is_directory(m_BlocksBasePath))
+ {
+ std::vector<std::filesystem::path> FoldersToScan;
+ FoldersToScan.push_back(m_BlocksBasePath);
+ size_t FolderOffset = 0;
+ while (FolderOffset < FoldersToScan.size())
+ {
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
+ {
+ if (Entry.is_directory())
+ {
+ FoldersToScan.push_back(Entry.path());
+ continue;
+ }
+ if (Entry.is_regular_file())
+ {
+ const std::filesystem::path Path = Entry.path();
+ if (Path.extension() != GetBlockFileExtension())
+ {
+ continue;
+ }
+ std::string FileName = PathToUtf8(Path.stem());
+ uint32_t BlockIndex;
+ bool OK = ParseHexNumber(FileName, BlockIndex);
+ if (!OK)
+ {
+ continue;
+ }
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ // Log removing unreferenced block
+ // Clear out unused blocks
+ ZEN_DEBUG("removing unused block at '{}'", Path);
+ std::error_code Ec;
+ std::filesystem::remove(Path, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
+ }
+ continue;
+ }
+ Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)};
+ BlockFile->Open();
+ m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed);
+ m_ChunkBlocks[BlockIndex] = BlockFile;
+ }
+ }
+ ++FolderOffset;
+ }
+ }
+ else
+ {
+ CreateDirectories(m_BlocksBasePath);
+ }
+}
+
+void
+BlockStore::Close()
+{
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ m_WriteBlock = nullptr;
+ m_CurrentInsertOffset = 0;
+ m_WriteBlockIndex = 0;
+
+ m_ChunkBlocks.clear();
+ m_BlocksBasePath.clear();
+}
+
+void
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback)
+{
+ ZEN_ASSERT(Data != nullptr);
+ ZEN_ASSERT(Size > 0u);
+ ZEN_ASSERT(Size <= m_MaxBlockSize);
+ ZEN_ASSERT(Alignment > 0u);
+
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ bool IsWriting = !!m_WriteBlock;
+ if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize)
+ {
+ if (m_WriteBlock)
+ {
+ m_WriteBlock = nullptr;
+ }
+
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
+ }
+
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+
+ Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
+ NewBlockFile->Create(m_MaxBlockSize);
+
+ m_ChunkBlocks[WriteBlockIndex] = NewBlockFile;
+ m_WriteBlock = NewBlockFile;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+ uint64_t InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
+ uint64_t AlignedWriteSize = m_CurrentInsertOffset - InsertOffset;
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ m_ActiveWriteBlocks.push_back(WriteBlockIndex);
+ InsertLock.ReleaseNow();
+
+ WriteBlock->Write(Data, Size, InsertOffset);
+ m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed);
+
+ Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size});
+
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ }
+}
+
+BlockStore::ReclaimSnapshotState
+BlockStore::GetReclaimSnapshotState()
+{
+ ReclaimSnapshotState State;
+ RwLock::SharedLockScope _(m_InsertLock);
+ for (uint32_t BlockIndex : m_ActiveWriteBlocks)
+ {
+ State.m_ActiveWriteBlocks.insert(BlockIndex);
+ }
+ if (m_WriteBlock)
+ {
+ State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex);
+ }
+ State.BlockCount = m_ChunkBlocks.size();
+ return State;
+}
+
+IoBuffer
+BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
+{
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
+ {
+ if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ {
+ return Block->GetChunk(Location.Offset, Location.Size);
+ }
+ }
+ return IoBuffer();
+}
+
+void
+BlockStore::Flush()
+{
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
+ {
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+}
+
+void
+BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
+ const std::vector<BlockStoreLocation>& ChunkLocations,
+ const ChunkIndexArray& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& ChangeCallback,
+ const ClaimDiskReserveCallback& DiskReserveCallback)
+{
+ if (ChunkLocations.empty())
+ {
+ return;
+ }
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = ChunkLocations.size();
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = 0;
+ uint64_t NewTotalSize = 0;
+
+ uint64_t MovedCount = 0;
+ uint64_t DeletedCount = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG(
+ "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
+ "{} "
+ "of {} "
+ "chunks ({}).",
+ m_BlocksBasePath,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ });
+
+ size_t BlockCount = Snapshot.BlockCount;
+ if (BlockCount == 0)
+ {
+ ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath);
+ return;
+ }
+
+ std::unordered_set<size_t> KeepChunkMap;
+ KeepChunkMap.reserve(KeepChunkIndexes.size());
+ for (size_t KeepChunkIndex : KeepChunkIndexes)
+ {
+ KeepChunkMap.insert(KeepChunkIndex);
+ }
+
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<ChunkIndexArray> BlockKeepChunks;
+ std::vector<ChunkIndexArray> BlockDeleteChunks;
+
+ BlockIndexToChunkMapIndex.reserve(BlockCount);
+ BlockKeepChunks.reserve(BlockCount);
+ BlockDeleteChunks.reserve(BlockCount);
+ size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
+
+ size_t DeleteCount = 0;
+ for (size_t Index = 0; Index < TotalChunkCount; ++Index)
+ {
+ const BlockStoreLocation& Location = ChunkLocations[Index];
+ OldTotalSize += Location.Size;
+ if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex))
+ {
+ continue;
+ }
+
+ auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
+ size_t ChunkMapIndex = 0;
+ if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
+ {
+ ChunkMapIndex = BlockKeepChunks.size();
+ BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
+ BlockKeepChunks.resize(ChunkMapIndex + 1);
+ BlockKeepChunks.back().reserve(GuesstimateCountPerBlock);
+ BlockDeleteChunks.resize(ChunkMapIndex + 1);
+ BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock);
+ }
+ else
+ {
+ ChunkMapIndex = BlockIndexPtr->second;
+ }
+
+ if (KeepChunkMap.contains(Index))
+ {
+ ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ NewTotalSize += Location.Size;
+ continue;
+ }
+ ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ DeleteCount++;
+ }
+
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
+ for (const auto& Entry : BlockIndexToChunkMapIndex)
+ {
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex];
+ if (ChunkMap.empty())
+ {
+ continue;
+ }
+ BlocksToReWrite.insert(BlockIndex);
+ }
+
+ if (DryRun)
+ {
+ ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}",
+ m_BlocksBasePath,
+ DeleteCount,
+ NiceBytes(OldTotalSize - NewTotalSize),
+ TotalChunkCount,
+ OldTotalSize);
+ return;
+ }
+
+ Ref<BlockStoreFile> NewBlockFile;
+ try
+ {
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
+ for (uint32_t BlockIndex : BlocksToReWrite)
+ {
+ const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
+
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _i(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ OldBlockFile = m_ChunkBlocks[BlockIndex];
+ }
+
+ if (!OldBlockFile)
+ {
+ // If the block file pointed to does not exist, move them all to deleted list
+ BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(),
+ BlockKeepChunks[ChunkMapIndex].begin(),
+ BlockKeepChunks[ChunkMapIndex].end());
+ BlockKeepChunks[ChunkMapIndex].clear();
+ }
+
+ const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex];
+ if (KeepMap.empty())
+ {
+ const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ }
+ ChangeCallback({}, DeleteMap);
+ DeletedCount += DeleteMap.size();
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (OldBlockFile)
+ {
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
+ OldBlockFile->MarkAsDeleteOnClose();
+ }
+ }
+ continue;
+ }
+
+ ZEN_ASSERT(OldBlockFile);
+
+ MovedChunksArray MovedChunks;
+ std::vector<uint8_t> Chunk;
+ for (const size_t& ChunkIndex : KeepMap)
+ {
+ const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
+ {
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+
+ if (NewBlockFile)
+ {
+ NewBlockFile->Flush();
+ NewBlockFile = nullptr;
+ }
+ {
+ ChangeCallback(MovedChunks, {});
+ MovedCount += KeepMap.size();
+ MovedChunks.clear();
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BlocksBasePath,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+ while (m_ChunkBlocks.contains(NextBlockIndex))
+ {
+ NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
+ return;
+ }
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = DiskReserveCallback();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks.erase(NextBlockIndex);
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BlocksBasePath,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ }
+
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}});
+ uint64_t OldOffset = WriteOffset;
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed);
+ }
+ Chunk.clear();
+ if (NewBlockFile)
+ {
+ NewBlockFile->Flush();
+ NewBlockFile = nullptr;
+ }
+
+ const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ }
+
+ ChangeCallback(MovedChunks, DeleteMap);
+ MovedCount += KeepMap.size();
+ DeletedCount += DeleteMap.size();
+ MovedChunks.clear();
+ {
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
+ OldBlockFile->MarkAsDeleteOnClose();
+ }
+ }
+ }
+ catch (std::exception& ex)
+ {
+ ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what());
+ if (NewBlockFile)
+ {
+ ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
+ m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed);
+ NewBlockFile->MarkAsDeleteOnClose();
+ }
+ }
+}
+
+void
+BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const IterateChunksSmallSizeCallback& SmallSizeCallback,
+ const IterateChunksLargeSizeCallback& LargeSizeCallback)
+{
+ std::vector<size_t> LocationIndexes;
+ LocationIndexes.reserve(ChunkLocations.size());
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
+ {
+ LocationIndexes.push_back(ChunkIndex);
+ }
+ std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
+ const BlockStoreLocation& LocationA = ChunkLocations[IndexA];
+ const BlockStoreLocation& LocationB = ChunkLocations[IndexB];
+ if (LocationA.BlockIndex < LocationB.BlockIndex)
+ {
+ return true;
+ }
+ else if (LocationA.BlockIndex > LocationB.BlockIndex)
+ {
+ return false;
+ }
+ return LocationA.Offset < LocationB.Offset;
+ });
+
+ IoBuffer ReadBuffer{ScrubSmallChunkWindowSize};
+ void* BufferBase = ReadBuffer.MutableData();
+
+ RwLock::SharedLockScope _(m_InsertLock);
+
+ auto GetNextRange = [&](size_t StartIndexOffset) {
+ size_t ChunkCount = 0;
+ size_t StartIndex = LocationIndexes[StartIndexOffset];
+ const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
+ uint64_t StartOffset = StartLocation.Offset;
+ while (StartIndexOffset + ChunkCount < LocationIndexes.size())
+ {
+ size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount];
+ const BlockStoreLocation& Location = ChunkLocations[NextIndex];
+ if (Location.BlockIndex != StartLocation.BlockIndex)
+ {
+ break;
+ }
+ if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize)
+ {
+ break;
+ }
+ ++ChunkCount;
+ }
+ return ChunkCount;
+ };
+
+ size_t LocationIndexOffset = 0;
+ while (LocationIndexOffset < LocationIndexes.size())
+ {
+ size_t ChunkIndex = LocationIndexes[LocationIndexOffset];
+ const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
+
+ const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex];
+ if (!BlockFile)
+ {
+ while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex)
+ {
+ SmallSizeCallback(ChunkIndex, nullptr, 0);
+ LocationIndexOffset++;
+ if (LocationIndexOffset == LocationIndexes.size())
+ {
+ break;
+ }
+ ChunkIndex = LocationIndexes[LocationIndexOffset];
+ }
+ continue;
+ }
+ size_t BlockSize = BlockFile->FileSize();
+ size_t RangeCount = GetNextRange(LocationIndexOffset);
+ if (RangeCount > 0)
+ {
+ size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1];
+ const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
+ uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
+ BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
+ for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
+ {
+ size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex];
+ const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
+ if (ChunkLocation.Size == 0 || (ChunkLocation.Offset + ChunkLocation.Size > BlockSize))
+ {
+ SmallSizeCallback(NextChunkIndex, nullptr, 0);
+ continue;
+ }
+ void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
+ SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
+ }
+ LocationIndexOffset += RangeCount;
+ continue;
+ }
+ if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
+ {
+ SmallSizeCallback(ChunkIndex, nullptr, 0);
+ LocationIndexOffset++;
+ continue;
+ }
+ LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size);
+ LocationIndexOffset++;
+ }
+}
+
+const char*
+BlockStore::GetBlockFileExtension()
+{
+ return ".ucas";
+}
+
+std::filesystem::path
+BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
+{
+ ExtendablePathBuilder<256> Path;
+
+ char BlockHexString[9];
+ ToHexNumber(BlockIndex, BlockHexString);
+
+ Path.Append(BlocksBasePath);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
+ Path.AppendSeparator();
+ Path.Append(BlockHexString);
+ Path.Append(GetBlockFileExtension());
+ return Path.ToPath();
+}
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("blockstore.blockstoredisklocation")
+{
+ BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0};
+ CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4));
+
+ BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0};
+ CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4));
+
+ BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0};
+ CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4));
+
+ BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()};
+ CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4));
+
+ BlockStoreLocation MaxBlockIndexAndOffset =
+ BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0};
+ CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4));
+
+ BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex,
+ .Offset = BlockStoreDiskLocation::MaxOffset * 4,
+ .Size = std::numeric_limits<uint32_t>::max()};
+ CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4));
+
+ BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex,
+ .Offset = BlockStoreDiskLocation::MaxOffset * 4096,
+ .Size = std::numeric_limits<uint32_t>::max()};
+ CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096));
+
+ BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2,
+ .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4,
+ .Size = std::numeric_limits<uint32_t>::max() / 2};
+ CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4));
+}
+
+TEST_CASE("blockstore.blockfile")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path() / "blocks";
+ CreateDirectories(RootDirectory);
+
+ {
+ BlockStoreFile File1(RootDirectory / "1");
+ File1.Create(16384);
+ CHECK(File1.FileSize() == 0);
+ File1.Write("data", 5, 0);
+ IoBuffer DataChunk = File1.GetChunk(0, 5);
+ File1.Write("boop", 5, 5);
+ IoBuffer BoopChunk = File1.GetChunk(5, 5);
+ const char* Data = static_cast<const char*>(DataChunk.GetData());
+ CHECK(std::string(Data) == "data");
+ const char* Boop = static_cast<const char*>(BoopChunk.GetData());
+ CHECK(std::string(Boop) == "boop");
+ File1.Flush();
+ CHECK(File1.FileSize() == 10);
+ }
+ {
+ BlockStoreFile File1(RootDirectory / "1");
+ File1.Open();
+
+ char DataRaw[5];
+ File1.Read(DataRaw, 5, 0);
+ CHECK(std::string(DataRaw) == "data");
+ IoBuffer DataChunk = File1.GetChunk(0, 5);
+
+ char BoopRaw[5];
+ File1.Read(BoopRaw, 5, 5);
+ CHECK(std::string(BoopRaw) == "boop");
+
+ IoBuffer BoopChunk = File1.GetChunk(5, 5);
+ const char* Data = static_cast<const char*>(DataChunk.GetData());
+ CHECK(std::string(Data) == "data");
+ const char* Boop = static_cast<const char*>(BoopChunk.GetData());
+ CHECK(std::string(Boop) == "boop");
+ }
+
+ {
+ IoBuffer DataChunk;
+ IoBuffer BoopChunk;
+
+ {
+ BlockStoreFile File1(RootDirectory / "1");
+ File1.Open();
+ DataChunk = File1.GetChunk(0, 5);
+ BoopChunk = File1.GetChunk(5, 5);
+ }
+
+ CHECK(std::filesystem::exists(RootDirectory / "1"));
+
+ const char* Data = static_cast<const char*>(DataChunk.GetData());
+ CHECK(std::string(Data) == "data");
+ const char* Boop = static_cast<const char*>(BoopChunk.GetData());
+ CHECK(std::string(Boop) == "boop");
+ }
+ CHECK(std::filesystem::exists(RootDirectory / "1"));
+
+ {
+ IoBuffer DataChunk;
+ IoBuffer BoopChunk;
+
+ {
+ BlockStoreFile File1(RootDirectory / "1");
+ File1.Open();
+ File1.MarkAsDeleteOnClose();
+ DataChunk = File1.GetChunk(0, 5);
+ BoopChunk = File1.GetChunk(5, 5);
+ }
+
+ const char* Data = static_cast<const char*>(DataChunk.GetData());
+ CHECK(std::string(Data) == "data");
+ const char* Boop = static_cast<const char*>(BoopChunk.GetData());
+ CHECK(std::string(Boop) == "boop");
+ }
+ CHECK(!std::filesystem::exists(RootDirectory / "1"));
+}
+
+namespace blockstore::impl {
+ BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment)
+ {
+ BlockStoreLocation Location;
+ Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; });
+ CHECK(Location.Size == String.length());
+ return Location;
+ };
+
+ std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location)
+ {
+ IoBuffer ChunkData = Store.TryGetChunk(Location);
+ if (!ChunkData)
+ {
+ return "";
+ }
+ std::string AsString((const char*)ChunkData.Data(), ChunkData.Size());
+ return AsString;
+ };
+
+ std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories)
+ {
+ DirectoryContent DirectoryContent;
+ GetDirectoryContent(RootDir,
+ DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) |
+ (Directories ? DirectoryContent::IncludeDirsFlag : 0),
+ DirectoryContent);
+ std::vector<std::filesystem::path> Result;
+ Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end());
+ Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end());
+ return Result;
+ };
+
+ static IoBuffer CreateChunk(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
+ }
+} // namespace blockstore::impl
+
+TEST_CASE("blockstore.chunks")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory, 128, 1024, {});
+ IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
+ CHECK(!BadChunk);
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+
+ CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
+ CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
+
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4);
+ CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex);
+
+ CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
+ CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
+ CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData);
+}
+
+TEST_CASE("blockstore.clean.stray.blocks")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 128, 1024, {});
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ WriteStringAsChunk(Store, ThirdChunkData, 4);
+
+ Store.Close();
+
+ // Not referencing the second block means that we should be deleted
+ Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation});
+
+ CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1);
+}
+
+TEST_CASE("blockstore.flush.forces.new.block")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 128, 1024, {});
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ WriteStringAsChunk(Store, FirstChunkData, 4);
+ Store.Flush();
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ WriteStringAsChunk(Store, SecondChunkData, 4);
+ Store.Flush();
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ WriteStringAsChunk(Store, ThirdChunkData, 4);
+
+ CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3);
+}
+
+TEST_CASE("blockstore.iterate.chunks")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {});
+ IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
+ CHECK(!BadChunk);
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+ Store.Flush();
+
+ std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L');
+ BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);
+
+ BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0};
+ BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0,
+ .Offset = ScrubSmallChunkWindowSize,
+ .Size = ScrubSmallChunkWindowSize * 2};
+ BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024};
+
+ Store.IterateChunks(
+ {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex},
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ switch (ChunkIndex)
+ {
+ case 0:
+ CHECK(Data);
+ CHECK(Size == FirstChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == FirstChunkData);
+ break;
+ case 1:
+ CHECK(Data);
+ CHECK(Size == SecondChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == SecondChunkData);
+ break;
+ case 2:
+ CHECK(false);
+ break;
+ case 3:
+ CHECK(!Data);
+ break;
+ case 4:
+ CHECK(!Data);
+ break;
+ case 5:
+ CHECK(!Data);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ switch (ChunkIndex)
+ {
+ case 0:
+ case 1:
+ CHECK(false);
+ break;
+ case 2:
+ {
+ CHECK(Size == VeryLargeChunk.size());
+ char* Buffer = new char[Size];
+ size_t HashOffset = 0;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
+ memcpy(&Buffer[HashOffset], Data, Size);
+ HashOffset += Size;
+ });
+ CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
+ delete[] Buffer;
+ }
+ break;
+ case 3:
+ CHECK(false);
+ break;
+ case 4:
+ CHECK(false);
+ break;
+ case 5:
+ CHECK(false);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ });
+}
+
+TEST_CASE("blockstore.reclaim.space")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 512, 1024, {});
+
+ constexpr size_t ChunkCount = 200;
+ constexpr size_t Alignment = 8;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkHashes;
+ ChunkLocations.reserve(ChunkCount);
+ ChunkHashes.reserve(ChunkCount);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer Chunk = CreateChunk(57 + ChunkIndex);
+
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); });
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ std::vector<size_t> ChunksToKeep;
+ ChunksToKeep.reserve(ChunkLocations.size());
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ ChunksToKeep.push_back(ChunkIndex);
+ }
+
+ Store.Flush();
+ BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState();
+ Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true);
+
+ // If we keep all the chunks we should not get any callbacks on moved/deleted stuff
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ ChunksToKeep,
+ Alignment,
+ false,
+ [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+
+ size_t DeleteChunkCount = 38;
+ ChunksToKeep.clear();
+ for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ ChunksToKeep.push_back(ChunkIndex);
+ }
+
+ std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations;
+ size_t MovedChunkCount = 0;
+ size_t DeletedChunkCount = 0;
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ ChunksToKeep,
+ Alignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
+ for (const auto& MovedChunk : MovedChunks)
+ {
+ CHECK(MovedChunk.first >= DeleteChunkCount);
+ NewChunkLocations[MovedChunk.first] = MovedChunk.second;
+ }
+ MovedChunkCount += MovedChunks.size();
+ for (size_t DeletedIndex : DeletedChunks)
+ {
+ CHECK(DeletedIndex < DeleteChunkCount);
+ }
+ DeletedChunkCount += DeletedChunks.size();
+ },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+ CHECK(MovedChunkCount <= DeleteChunkCount);
+ CHECK(DeletedChunkCount == DeleteChunkCount);
+ ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end());
+
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
+ if (ChunkIndex >= DeleteChunkCount)
+ {
+ IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ }
+ }
+
+ NewChunkLocations = ChunkLocations;
+ MovedChunkCount = 0;
+ DeletedChunkCount = 0;
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ {},
+ Alignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
+ CHECK(MovedChunks.empty());
+ DeletedChunkCount += DeletedChunks.size();
+ },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+ CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount);
+}
+
+TEST_CASE("blockstore.thread.read.write")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 1088, 1024, {});
+
+ constexpr size_t ChunkCount = 1000;
+ constexpr size_t Alignment = 8;
+ std::vector<IoBuffer> Chunks;
+ std::vector<IoHash> ChunkHashes;
+ Chunks.reserve(ChunkCount);
+ ChunkHashes.reserve(ChunkCount);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2);
+ Chunks.push_back(Chunk);
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkLocations.resize(ChunkCount);
+
+ WorkerThreadPool WorkerPool(8);
+ std::atomic<size_t> WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ std::vector<BlockStoreLocation> SecondChunkLocations;
+ SecondChunkLocations.resize(ChunkCount);
+ WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ SecondChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ });
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size() * 2)
+ {
+ Sleep(1);
+ }
+}
+
+#endif
+
+void
+blockstore_forcelink()
+{
+}
+
+} // namespace zen
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
new file mode 100644
index 000000000..fdec78c60
--- /dev/null
+++ b/src/zenstore/cas.cpp
@@ -0,0 +1,355 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "cas.h"
+
+#include "compactcas.h"
+#include "filecas.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+#include <zencore/uid.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/gc.h>
+#include <zenstore/scrubcontext.h>
+
+#include <gsl/gsl-lite.hpp>
+
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+/**
+ * CAS store implementation
+ *
+ * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space
+ * quickly for unused large chunks and to maintain locality for small chunks which are
+ * frequently accessed together.
+ *
+ */
+class CasImpl : public CasStore
+{
+public:
+ CasImpl(GcManager& Gc);
+ virtual ~CasImpl();
+
+ virtual void Initialize(const CidStoreConfiguration& InConfig) override;
+ virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) override;
+ virtual void FilterChunks(HashKeySet& InOutChunks) override;
+ virtual void Flush() override;
+ virtual void Scrub(ScrubContext& Ctx) override;
+ virtual void GarbageCollect(GcContext& GcCtx) override;
+ virtual CidStoreSize TotalSize() const override;
+
+private:
+ CasContainerStrategy m_TinyStrategy;
+ CasContainerStrategy m_SmallStrategy;
+ FileCasStrategy m_LargeStrategy;
+ CbObject m_ManifestObject;
+
+ enum class StorageScheme
+ {
+ Legacy = 0,
+ WithCbManifest = 1
+ };
+
+ StorageScheme m_StorageScheme = StorageScheme::Legacy;
+
+ bool OpenOrCreateManifest();
+ void UpdateManifest();
+};
+
+CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc)
+{
+}
+
+CasImpl::~CasImpl()
+{
+}
+
+void
+CasImpl::Initialize(const CidStoreConfiguration& InConfig)
+{
+ m_Config = InConfig;
+
+ ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory);
+
+ // Ensure root directory exists - create if it doesn't exist already
+
+ std::filesystem::create_directories(m_Config.RootDirectory);
+
+ // Open or create manifest
+
+ const bool IsNewStore = OpenOrCreateManifest();
+
+ // Initialize payload storage
+
+ m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore);
+ m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
+ m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
+}
+
+bool
+CasImpl::OpenOrCreateManifest()
+{
+ bool IsNewStore = false;
+
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
+
+ std::error_code Ec;
+ BasicFile ManifestFile;
+ ManifestFile.Open(ManifestPath.c_str(), BasicFile::Mode::kRead, Ec);
+
+ bool ManifestIsOk = false;
+
+ if (Ec)
+ {
+ if (Ec == std::errc::no_such_file_or_directory)
+ {
+ IsNewStore = true;
+ }
+ }
+ else
+ {
+ IoBuffer ManifestBuffer = ManifestFile.ReadAll();
+ ManifestFile.Close();
+
+ if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#')
+ {
+ // Old-style manifest, does not contain any useful information, so we may as well update it
+ }
+ else
+ {
+ CbObject Manifest{SharedBuffer(ManifestBuffer)};
+ CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All);
+
+ if (ValidationResult == CbValidateError::None)
+ {
+ if (Manifest["id"])
+ {
+ ManifestIsOk = true;
+ }
+ }
+ else
+ {
+ ZEN_WARN("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult));
+ }
+
+ if (ManifestIsOk)
+ {
+ m_ManifestObject = std::move(Manifest);
+ }
+ }
+ }
+
+ if (!ManifestIsOk)
+ {
+ UpdateManifest();
+ }
+
+ return IsNewStore;
+}
+
+void
+CasImpl::UpdateManifest()
+{
+ if (!m_ManifestObject)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now();
+ m_ManifestObject = Cbo.Save();
+ }
+
+ // Write manifest to file
+
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
+
+ // This will throw on failure
+
+ ZEN_TRACE("Writing new manifest to '{}'", ManifestPath);
+
+ BasicFile Marker;
+ Marker.Open(ManifestPath.c_str(), BasicFile::Mode::kTruncate);
+ Marker.Write(m_ManifestObject.GetBuffer(), 0);
+}
+
+CasStore::InsertResult
+CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode)
+{
+ ZEN_TRACE_CPU("CAS::InsertChunk");
+
+ const uint64_t ChunkSize = Chunk.Size();
+
+ if (ChunkSize < m_Config.TinyValueThreshold)
+ {
+ ZEN_ASSERT(ChunkSize);
+
+ return m_TinyStrategy.InsertChunk(Chunk, ChunkHash);
+ }
+ else if (ChunkSize < m_Config.HugeValueThreshold)
+ {
+ return m_SmallStrategy.InsertChunk(Chunk, ChunkHash);
+ }
+
+ return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode);
+}
+
+IoBuffer
+CasImpl::FindChunk(const IoHash& ChunkHash)
+{
+ ZEN_TRACE_CPU("CAS::FindChunk");
+
+ if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ // Not found
+ return IoBuffer{};
+}
+
+bool
+CasImpl::ContainsChunk(const IoHash& ChunkHash)
+{
+ return m_SmallStrategy.HaveChunk(ChunkHash) || m_TinyStrategy.HaveChunk(ChunkHash) || m_LargeStrategy.HaveChunk(ChunkHash);
+}
+
+void
+CasImpl::FilterChunks(HashKeySet& InOutChunks)
+{
+ m_SmallStrategy.FilterChunks(InOutChunks);
+ m_TinyStrategy.FilterChunks(InOutChunks);
+ m_LargeStrategy.FilterChunks(InOutChunks);
+}
+
+void
+CasImpl::Flush()
+{
+ m_SmallStrategy.Flush();
+ m_TinyStrategy.Flush();
+ m_LargeStrategy.Flush();
+}
+
+void
+CasImpl::Scrub(ScrubContext& Ctx)
+{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_SmallStrategy.Scrub(Ctx);
+ m_TinyStrategy.Scrub(Ctx);
+ m_LargeStrategy.Scrub(Ctx);
+}
+
+void
+CasImpl::GarbageCollect(GcContext& GcCtx)
+{
+ m_SmallStrategy.CollectGarbage(GcCtx);
+ m_TinyStrategy.CollectGarbage(GcCtx);
+ m_LargeStrategy.CollectGarbage(GcCtx);
+}
+
+CidStoreSize
+CasImpl::TotalSize() const
+{
+ const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize;
+ const uint64_t Small = m_SmallStrategy.StorageSize().DiskSize;
+ const uint64_t Large = m_LargeStrategy.StorageSize().DiskSize;
+
+ return {.TinySize = Tiny, .SmallSize = Small, .LargeSize = Large, .TotalSize = Tiny + Small + Large};
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<CasStore>
+CreateCasStore(GcManager& Gc)
+{
+ return std::make_unique<CasImpl>(Gc);
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Testing related code follows...
+//
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("CasStore")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CidStoreConfiguration config;
+ config.RootDirectory = TempDir.Path();
+
+ GcManager Gc;
+
+ std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
+ Store->Initialize(config);
+
+ ScrubContext Ctx;
+ Store->Scrub(Ctx);
+
+ IoBuffer Value1{16};
+ memcpy(Value1.MutableData(), "1234567890123456", 16);
+ IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size());
+ CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1);
+ CHECK(Result1.New);
+
+ IoBuffer Value2{16};
+ memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16);
+ IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size());
+ CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2);
+ CHECK(Result2.New);
+
+ HashKeySet ChunkSet;
+ ChunkSet.AddHashToSet(Hash1);
+ ChunkSet.AddHashToSet(Hash2);
+
+ Store->FilterChunks(ChunkSet);
+ CHECK(ChunkSet.IsEmpty());
+
+ IoBuffer Lookup1 = Store->FindChunk(Hash1);
+ CHECK(Lookup1);
+ IoBuffer Lookup2 = Store->FindChunk(Hash2);
+ CHECK(Lookup2);
+}
+
+void
+CAS_forcelink()
+{
+}
+
+#endif
+
+} // namespace zen
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
new file mode 100644
index 000000000..9c48d4707
--- /dev/null
+++ b/src/zenstore/cas.h
@@ -0,0 +1,67 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/blake3.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/refcount.h>
+#include <zencore/timer.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/hashkeyset.h>
+
+#include <atomic>
+#include <filesystem>
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+namespace zen {
+
+class GcContext;
+class GcManager;
+class ScrubContext;
+
+/** Content Addressable Storage interface
+
+ */
+
+class CasStore
+{
+public:
+ virtual ~CasStore() = default;
+
+ const CidStoreConfiguration& Config() { return m_Config; }
+
+ struct InsertResult
+ {
+ bool New = false;
+ };
+
+ enum class InsertMode
+ {
+ kCopyOnly,
+ kMayBeMovedInPlace
+ };
+
+ virtual void Initialize(const CidStoreConfiguration& Config) = 0;
+ virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
+ virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+ virtual void Flush() = 0;
+ virtual void Scrub(ScrubContext& Ctx) = 0;
+ virtual void GarbageCollect(GcContext& GcCtx) = 0;
+ virtual CidStoreSize TotalSize() const = 0;
+
+protected:
+ CidStoreConfiguration m_Config;
+ uint64_t m_LastScrubTime = 0;
+};
+
+ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc);
+
+void CAS_forcelink();
+
+} // namespace zen
diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp
new file mode 100644
index 000000000..2a978ae12
--- /dev/null
+++ b/src/zenstore/caslog.cpp
@@ -0,0 +1,236 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/caslog.h>
+
+#include "compactcas.h"
+
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/uid.h>
+
+#include <xxhash.h>
+
+#include <gsl/gsl-lite.hpp>
+
+#include <filesystem>
+#include <functional>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+uint32_t
+CasLogFile::FileHeader::ComputeChecksum()
+{
+ return XXH32(&this->Magic, sizeof(FileHeader) - 4, 0xC0C0'BABA);
+}
+
+CasLogFile::CasLogFile()
+{
+}
+
+CasLogFile::~CasLogFile()
+{
+}
+
+bool
+CasLogFile::IsValid(std::filesystem::path FileName, size_t RecordSize)
+{
+ if (!std::filesystem::is_regular_file(FileName))
+ {
+ return false;
+ }
+ BasicFile File;
+
+ std::error_code Ec;
+ File.Open(FileName, BasicFile::Mode::kRead, Ec);
+ if (Ec)
+ {
+ return false;
+ }
+
+ FileHeader Header;
+ if (File.FileSize() < sizeof(Header))
+ {
+ return false;
+ }
+
+ // Validate header and log contents and prepare for appending/replay
+ File.Read(&Header, sizeof Header, 0);
+
+ if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum()))
+ {
+ return false;
+ }
+ if (Header.RecordSize != RecordSize)
+ {
+ return false;
+ }
+ return true;
+}
+
+void
+CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode)
+{
+ m_RecordSize = RecordSize;
+
+ std::error_code Ec;
+ BasicFile::Mode FileMode = BasicFile::Mode::kRead;
+ switch (Mode)
+ {
+ case Mode::kWrite:
+ FileMode = BasicFile::Mode::kWrite;
+ break;
+ case Mode::kTruncate:
+ FileMode = BasicFile::Mode::kTruncate;
+ break;
+ }
+
+ m_File.Open(FileName, FileMode, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to open log file '{}'", FileName));
+ }
+
+ uint64_t AppendOffset = 0;
+
+ if ((Mode == Mode::kTruncate) || (m_File.FileSize() < sizeof(FileHeader)))
+ {
+ if (Mode == Mode::kRead)
+ {
+ throw std::runtime_error(fmt::format("Mangled log header (file to small) in '{}'", FileName));
+ }
+ // Initialize log by writing header
+ FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0};
+ memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic);
+ Header.Finalize();
+
+ m_File.Write(&Header, sizeof Header, 0);
+
+ AppendOffset = sizeof(FileHeader);
+
+ m_Header = Header;
+ }
+ else
+ {
+ FileHeader Header;
+ m_File.Read(&Header, sizeof Header, 0);
+
+ if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum()))
+ {
+ throw std::runtime_error(fmt::format("Mangled log header (invalid header magic) in '{}'", FileName));
+ }
+ if (Header.RecordSize != RecordSize)
+ {
+ throw std::runtime_error(fmt::format("Mangled log header (mismatch in record size, expected {}, found {}) in '{}'",
+ RecordSize,
+ Header.RecordSize,
+ FileName));
+ }
+
+ AppendOffset = m_File.FileSize();
+
+ // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended
+
+ AppendOffset -= sizeof Header;
+ AppendOffset -= AppendOffset % RecordSize;
+ AppendOffset += sizeof Header;
+
+ m_Header = Header;
+ }
+
+ m_AppendOffset = AppendOffset;
+}
+
+void
+CasLogFile::Close()
+{
+ // TODO: update header and maybe add trailer
+ Flush();
+
+ m_File.Close();
+}
+
+uint64_t
+CasLogFile::GetLogSize()
+{
+ return m_File.FileSize();
+}
+
+uint64_t
+CasLogFile::GetLogCount()
+{
+ uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire);
+ if (LogFileSize < sizeof(FileHeader))
+ {
+ return 0;
+ }
+ const uint64_t LogBaseOffset = sizeof(FileHeader);
+ const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
+ return LogEntryCount;
+}
+
+void
+CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount)
+{
+ uint64_t LogFileSize = m_File.FileSize();
+
+ // Ensure we end up on a clean boundary
+ uint64_t LogBaseOffset = sizeof(FileHeader);
+ size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
+
+ if (LogEntryCount <= SkipEntryCount)
+ {
+ return;
+ }
+
+ LogBaseOffset += SkipEntryCount * m_RecordSize;
+ LogEntryCount -= SkipEntryCount;
+
+ // This should really be streaming the data rather than just
+ // reading it into memory, though we don't tend to get very
+ // large logs so it may not matter
+
+ const uint64_t LogDataSize = LogEntryCount * m_RecordSize;
+
+ std::vector<uint8_t> ReadBuffer;
+ ReadBuffer.resize(LogDataSize);
+
+ m_File.Read(ReadBuffer.data(), LogDataSize, LogBaseOffset);
+
+ for (int i = 0; i < int(LogEntryCount); ++i)
+ {
+ Handler(ReadBuffer.data() + (i * m_RecordSize));
+ }
+
+ m_AppendOffset = LogBaseOffset + (m_RecordSize * LogEntryCount);
+}
+
+void
+CasLogFile::Append(const void* DataPointer, uint64_t DataSize)
+{
+ ZEN_ASSERT((DataSize % m_RecordSize) == 0);
+
+ uint64_t AppendOffset = m_AppendOffset.fetch_add(DataSize);
+
+ std::error_code Ec;
+ m_File.Write(DataPointer, gsl::narrow<uint32_t>(DataSize), AppendOffset, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to write to log file '{}'", PathFromHandle(m_File.Handle())));
+ }
+}
+
+void
+CasLogFile::Flush()
+{
+ m_File.Flush();
+}
+
+} // namespace zen
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
new file mode 100644
index 000000000..5a5116faf
--- /dev/null
+++ b/src/zenstore/cidstore.cpp
@@ -0,0 +1,125 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenstore/cidstore.h"
+
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
+#include <zenstore/scrubcontext.h>
+
+#include "cas.h"
+
+#include <filesystem>
+
+namespace zen {
+
+struct CidStore::Impl
+{
+ Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {}
+
+ CasStore& m_CasStore;
+
+ void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); }
+
+ CidStore::InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, CidStore::InsertMode Mode)
+ {
+#ifndef NDEBUG
+ IoHash VerifyRawHash;
+ uint64_t _;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHash == VerifyRawHash);
+#endif // NDEBUG
+ IoBuffer Payload(ChunkData);
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, RawHash, static_cast<CasStore::InsertMode>(Mode));
+
+ return {.New = Result.New};
+ }
+
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); }
+
+ bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); }
+
+ void FilterChunks(HashKeySet& InOutChunks)
+ {
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); });
+ }
+
+ void Flush() { m_CasStore.Flush(); }
+
+ void Scrub(ScrubContext& Ctx)
+ {
+ if (Ctx.ScrubTimestamp() == m_LastScrubTime)
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_CasStore.Scrub(Ctx);
+ }
+
+ uint64_t m_LastScrubTime = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore))
+{
+}
+
+CidStore::~CidStore()
+{
+}
+
+void
+CidStore::Initialize(const CidStoreConfiguration& Config)
+{
+ m_Impl->Initialize(Config);
+}
+
+CidStore::InsertResult
+CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode)
+{
+ return m_Impl->AddChunk(ChunkData, RawHash, Mode);
+}
+
+IoBuffer
+CidStore::FindChunkByCid(const IoHash& DecompressedId)
+{
+ return m_Impl->FindChunkByCid(DecompressedId);
+}
+
+bool
+CidStore::ContainsChunk(const IoHash& DecompressedId)
+{
+ return m_Impl->ContainsChunk(DecompressedId);
+}
+
+void
+CidStore::FilterChunks(HashKeySet& InOutChunks)
+{
+ return m_Impl->FilterChunks(InOutChunks);
+}
+
+void
+CidStore::Flush()
+{
+ m_Impl->Flush();
+}
+
+void
+CidStore::Scrub(ScrubContext& Ctx)
+{
+ m_Impl->Scrub(Ctx);
+}
+
+CidStoreSize
+CidStore::TotalSize() const
+{
+ return m_Impl->m_CasStore.TotalSize();
+}
+
+} // namespace zen
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
new file mode 100644
index 000000000..7b2c21b0f
--- /dev/null
+++ b/src/zenstore/compactcas.cpp
@@ -0,0 +1,1511 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "compactcas.h"
+
+#include "cas.h"
+
+#include <zencore/compress.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zenstore/scrubcontext.h>
+
+#include <gsl/gsl-lite.hpp>
+
+#include <xxhash.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <zenstore/cidstore.h>
+# include <algorithm>
+# include <random>
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+struct CasDiskIndexHeader
+{
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+};
+
+static_assert(sizeof(CasDiskIndexHeader) == 32);
+
+namespace {
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".ulog";
+
+ std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / ContainerBaseName;
+ }
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension);
+ }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension);
+ }
+
+ std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / "blocks";
+ }
+
+ bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0)
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ return true;
+ }
+ if (Entry.ContentType != ZenContentType::kUnknownContentType)
+ {
+ OutReason =
+ fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
+ return false;
+ }
+ uint64_t Size = Entry.Location.GetSize();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
+CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas"))
+{
+}
+
+CasContainerStrategy::~CasContainerStrategy()
+{
+}
+
+void
+CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ uint32_t MaxBlockSize,
+ uint64_t Alignment,
+ bool IsNewStore)
+{
+ ZEN_ASSERT(IsPow2(Alignment));
+ ZEN_ASSERT(!m_IsInitialized);
+ ZEN_ASSERT(MaxBlockSize > 0);
+
+ m_RootDirectory = RootDirectory;
+ m_ContainerBaseName = ContainerBaseName;
+ m_PayloadAlignment = Alignment;
+ m_MaxBlockSize = MaxBlockSize;
+ m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName);
+
+ OpenContainer(IsNewStore);
+
+ m_IsInitialized = true;
+}
+
+CasStore::InsertResult
+CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
+{
+ {
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ if (m_LocationMap.contains(ChunkHash))
+ {
+ return CasStore::InsertResult{.New = false};
+ }
+ }
+
+ // We can end up in a situation that InsertChunk writes the same chunk data in
+ // different locations.
+ // We release the insert lock once we have the correct WriteBlock ready and we know
+ // where to write the data. If a new InsertChunk request for the same chunk hash/data
+ // comes in before we update m_LocationMap below we will have a race.
+ // The outcome of that is that we will write the chunk data in more than one location
+ // but the chunk hash will only point to one of the chunks.
+ // We will in that case waste space until the next GC operation.
+ //
+ // This should be a rare occasion and the current flow reduces the time we block for
+ // reads, insert and GC.
+
+ m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) {
+ BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
+ m_CasLog.Append(IndexEntry);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, DiskLocation);
+ }
+ });
+
+ return CasStore::InsertResult{.New = true};
+}
+
+CasStore::InsertResult
+CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
+{
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+ return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
+}
+
+IoBuffer
+CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
+{
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
+ {
+ return IoBuffer();
+ }
+ const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment);
+
+ IoBuffer Chunk = m_BlockStore.TryGetChunk(Location);
+ return Chunk;
+}
+
+bool
+CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
+{
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ return m_LocationMap.contains(ChunkHash);
+}
+
+void
+CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
+{
+ // This implementation is good enough for relatively small
+ // chunk sets (in terms of chunk identifiers), but would
+ // benefit from a better implementation which removes
+ // items incrementally for large sets, especially when
+ // we're likely to already have a large proportion of the
+ // chunks in the set
+
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
+}
+
+void
+CasContainerStrategy::Flush()
+{
+ m_BlockStore.Flush();
+ m_CasLog.Flush();
+ MakeIndexSnapshot();
+}
+
+void
+CasContainerStrategy::Scrub(ScrubContext& Ctx)
+{
+ std::vector<IoHash> BadKeys;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+
+ RwLock::SharedLockScope _(m_LocationMapLock);
+
+ uint64_t TotalChunkCount = m_LocationMap.size();
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
+ {
+ for (const auto& Entry : m_LocationMap)
+ {
+ const IoHash& ChunkHash = Entry.first;
+ const BlockStoreDiskLocation& DiskLocation = Entry.second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash.push_back(ChunkHash);
+ }
+ }
+
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
+
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ if (!Data)
+ {
+ // ChunkLocation out of range of stored blocks
+ BadKeys.push_back(Hash);
+ return;
+ }
+
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ if (RawHash != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
+ IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
+ if (ComputedHash == Hash)
+ {
+ return;
+ }
+#endif
+ BadKeys.push_back(Hash);
+ };
+
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
+
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ // TODO: Add API to verify compressed buffer without having to memorymap the whole file
+ if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ if (RawHash != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
+ IoHashStream Hasher;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+ if (ComputedHash == Hash)
+ {
+ return;
+ }
+#endif
+ BadKeys.push_back(Hash);
+ };
+
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+
+ _.ReleaseNow();
+
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+
+ if (!BadKeys.empty())
+ {
+ ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
+
+ if (Ctx.RunRecovery())
+ {
+ // Deal with bad chunks by removing them from our lookup map
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ for (const IoHash& ChunkHash : BadKeys)
+ {
+ const auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
+ {
+ // Might have been GC'd
+ continue;
+ }
+ LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
+ m_LocationMap.erase(KeyIt);
+ }
+ }
+ m_CasLog.Append(LogEntries);
+ }
+ }
+
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCidChunks(BadKeys);
+
+ ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+}
+
+void
+CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
+{
+ // It collects all the blocks that we want to delete chunks from. For each such
+ // block we keep a list of chunks to retain and a list of chunks to delete.
+ //
+ // If there is a block that we are currently writing to, that block is omitted
+ // from the garbage collection.
+ //
+ // Next it will iterate over all blocks that we want to remove chunks from.
+ // If the block is empty after removal of chunks we mark the block as pending
+ // delete - we want to delete it as soon as there are no IoBuffers using the
+ // block file.
+ // Once complete we update the m_LocationMap by removing the chunks.
+ //
+ // If the block is non-empty we write out the chunks we want to keep to a new
+ // block file (creating new block files as needed).
+ //
+ // We update the index as we complete each new block file. This makes it possible
+ // to break the GC if we want to limit time for execution.
+ //
+ // GC can very parallell to regular operation - it will block while taking
+ // a snapshot of the current m_LocationMap state and while moving blocks it will
+ // do a blocking operation and update the m_LocationMap after each new block is
+ // written and figuring out the path to the next new block.
+
+ ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);
+
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+
+ LocationMap_t LocationMap;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
+ {
+ RwLock::SharedLockScope ___(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ LocationMap = m_LocationMap;
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
+ }
+
+ uint64_t TotalChunkCount = LocationMap.size();
+
+ std::vector<IoHash> TotalChunkHashes;
+ TotalChunkHashes.reserve(TotalChunkCount);
+ for (const auto& Entry : LocationMap)
+ {
+ TotalChunkHashes.push_back(Entry.first);
+ }
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ BlockStore::ChunkIndexArray KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
+
+ GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreDiskLocation& DiskLocation = KeyIt->second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
+
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ if (Keep)
+ {
+ KeepChunkIndexes.push_back(ChunkIndex);
+ }
+ });
+
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
+ return;
+ }
+
+ std::vector<IoHash> DeletedChunks;
+ m_BlockStore.ReclaimSpace(
+ BlockStoreState,
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_PayloadAlignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash];
+ LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone});
+ DeletedChunks.push_back(ChunkHash);
+ }
+
+ m_CasLog.Append(LogEntries);
+ m_CasLog.Flush();
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ for (const CasDiskIndexEntry& Entry : LogEntries)
+ {
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ m_LocationMap.erase(Entry.Key);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = Entry.Location;
+ }
+ }
+ },
+ [&GcCtx]() { return GcCtx.CollectSmallObjects(); });
+
+ GcCtx.AddDeletedCids(DeletedChunks);
+}
+
+void
+CasContainerStrategy::MakeIndexSnapshot()
+{
+ uint64_t LogCount = m_CasLog.GetLogCount();
+ if (m_LogFlushPosition == LogCount)
+ {
+ return;
+ }
+
+ ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
+ m_RootDirectory / m_ContainerBaseName,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
+ fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);
+
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(TempIndexPath))
+ {
+ fs::remove(TempIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, TempIndexPath);
+ }
+
+ try
+ {
+ // Write the current state of the location map to a new index state
+ std::vector<CasDiskIndexEntry> Entries;
+
+ {
+ RwLock::SharedLockScope ___(m_LocationMapLock);
+ Entries.resize(m_LocationMap.size());
+
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_LocationMap)
+ {
+ CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = Entry.second;
+ }
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ CasDiskIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ m_LogFlushPosition = LogCount;
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(TempIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(TempIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(TempIndexPath))
+ {
+ fs::remove(TempIndexPath);
+ }
+}
+
+uint64_t
+CasContainerStrategy::ReadIndexFile()
+{
+ std::vector<CasDiskIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}",
+ IndexPath,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CasDiskIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
+ CasDiskIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
+ {
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::string InvalidEntryReason;
+ for (const CasDiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = Entry.Location;
+ }
+
+ return Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ }
+ }
+ }
+ return 0;
+}
+
+uint64_t
+CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
+{
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ size_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing {} entries in {}",
+ m_RootDirectory / m_ContainerBaseName,
+ LogEntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ LogEntryCount = EntryCount - SkipEntryCount;
+ CasLog.Replay(
+ [&](const CasDiskIndexEntry& Record) {
+ LogEntryCount++;
+ std::string InvalidEntryReason;
+ if (Record.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ m_LocationMap.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ return;
+ }
+ m_LocationMap[Record.Key] = Record.Location;
+ },
+ SkipEntryCount);
+ return LogEntryCount;
+ }
+ }
+ return 0;
+}
+
+void
+CasContainerStrategy::OpenContainer(bool IsNewStore)
+{
+ // Add .running file and delete on clean on close to detect bad termination
+
+ m_LocationMap.clear();
+
+ std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName);
+
+ if (IsNewStore)
+ {
+ std::filesystem::remove_all(BasePath);
+ }
+
+ m_LogFlushPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
+
+ CreateDirectories(BasePath);
+
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::vector<BlockStoreLocation> KnownLocations;
+ KnownLocations.reserve(m_LocationMap.size());
+ for (const auto& Entry : m_LocationMap)
+ {
+ const BlockStoreDiskLocation& Location = Entry.second;
+ KnownLocations.push_back(Location.Get(m_PayloadAlignment));
+ }
+
+ m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
+
+ if (IsNewStore || (LogEntryCount > 0))
+ {
+ MakeIndexSnapshot();
+ }
+
+ // TODO: should validate integrity of container files here
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+namespace {
+ static IoBuffer CreateRandomChunk(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
+ }
+} // namespace
+
+TEST_CASE("compactcas.hex")
+{
+ uint32_t Value;
+ std::string HexString;
+ CHECK(!ParseHexNumber("", Value));
+ char Hex[9];
+
+ ToHexNumber(0u, Hex);
+ HexString = std::string(Hex);
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0u);
+
+ ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "ffffffff");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == std::numeric_limits<std::uint32_t>::max());
+
+ ToHexNumber(0xadf14711u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "adf14711");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0xadf14711u);
+
+ ToHexNumber(0x80000000u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "80000000");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0x80000000u);
+
+ ToHexNumber(0x718293a4u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "718293a4");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0x718293a4u);
+}
+
+TEST_CASE("compactcas.compact.gc")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ const int kIterationCount = 1000;
+
+ std::vector<IoHash> Keys(kIterationCount);
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << i;
+ CbObject Obj = Cbo.Save();
+
+ IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
+ const IoHash Hash = HashBuffer(ObjBuffer);
+
+ Cas.InsertChunk(ObjBuffer, Hash);
+
+ Keys[i] = Hash;
+ }
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ IoBuffer Chunk = Cas.FindChunk(Keys[i]);
+
+ CHECK(!!Chunk);
+
+ CbObject Value = LoadCompactBinaryObject(Chunk);
+
+ CHECK_EQ(Value["id"].AsInt32(), i);
+ }
+ }
+
+ // Validate that we can still read the inserted data after closing
+ // the original cas store
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ IoBuffer Chunk = Cas.FindChunk(Keys[i]);
+
+ CHECK(!!Chunk);
+
+ CbObject Value = LoadCompactBinaryObject(Chunk);
+
+ CHECK_EQ(Value["id"].AsInt32(), i);
+ }
+ }
+}
+
+TEST_CASE("compactcas.compact.totalsize")
+{
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ const uint64_t kChunkSize = 1024;
+ const int32_t kChunkCount = 16;
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateRandomChunk(kChunkSize);
+ const IoHash Hash = HashBuffer(Chunk);
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ }
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+
+ // Re-open again, this time we should have a snapshot
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+ }
+}
+
+TEST_CASE("compactcas.gc.basic")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
+
+ IoBuffer Chunk = CreateRandomChunk(128);
+ IoHash ChunkHash = IoHash::HashBuffer(Chunk);
+
+ const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(InsertResult.New);
+ Cas.Flush();
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHash));
+}
+
+TEST_CASE("compactcas.gc.removefile")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ IoBuffer Chunk = CreateRandomChunk(128);
+ IoHash ChunkHash = IoHash::HashBuffer(Chunk);
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
+
+ const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(InsertResult.New);
+ const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(!InsertResultDup.New);
+ Cas.Flush();
+ }
+
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false);
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHash));
+}
+
+TEST_CASE("compactcas.gc.compact")
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true);
+
+ uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(9);
+ for (uint64_t Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateRandomChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(9);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New);
+ CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New);
+ CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New);
+ CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New);
+ CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New);
+ CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New);
+ CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New);
+ CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New);
+ CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ // Keep first and last
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Keep last
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Keep mixed
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[1]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+
+ Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[8], ChunkHashes[8]);
+ }
+
+ // Keep multiple at end
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ }
+
+ // Keep every other
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[2]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Verify that we nicely appended blocks even after all GC operations
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ }
+}
+
+TEST_CASE("compactcas.gc.deleteblockonopen")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (uint64_t Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateRandomChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ // GC every other block
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+ }
+ {
+ // Re-open
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, false);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+}
+
+TEST_CASE("compactcas.gc.handleopeniobuffer")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (const uint64_t& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateRandomChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]);
+ Cas.Flush();
+
+ // GC everything
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ Cas.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(!Cas.HaveChunk(ChunkHashes[i]));
+ }
+
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
+}
+
+TEST_CASE("compactcas.threadedinsert")
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 4096;
+ uint64_t ExpectedSize = 0;
+
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ while (true)
+ {
+ IoBuffer Chunk = CreateRandomChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = Chunk;
+ ExpectedSize += Chunk.Size();
+ break;
+ }
+ }
+
+ std::atomic<size_t> WorkCompleted = 0;
+ WorkerThreadPool ThreadPool(4);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
+ {
+ for (const auto& Chunk : Chunks)
+ {
+ const IoHash& Hash = Chunk.first;
+ const IoBuffer& Buffer = Chunk.second;
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+
+ WorkCompleted = 0;
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_LE(ExpectedSize, TotalSize);
+ CHECK_GE(ExpectedSize + 32768, TotalSize);
+
+ {
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Buffer);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+
+ std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
+ {
+ GcChunkHashes.insert(Chunk.first);
+ }
+ {
+ WorkCompleted = 0;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
+ NewChunks.reserve(kChunkCount);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateRandomChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = Chunk;
+ }
+
+ std::atomic_uint32_t AddedChunkCount;
+
+ for (const auto& Chunk : NewChunks)
+ {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Cas.InsertChunk(Chunk.second, Chunk.first);
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ if (Buffer)
+ {
+ CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
+ }
+ WorkCompleted.fetch_add(1);
+ });
+ }
+
+ while (AddedChunkCount.load() < NewChunks.size())
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ if (Cas.HaveChunk(Chunk.first))
+ {
+ GcChunkHashes.emplace(Chunk.first);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.AddRetainedCids(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ if (Cas.HaveChunk(Chunk.first))
+ {
+ GcChunkHashes.emplace(Chunk.first);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.AddRetainedCids(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ {
+ WorkCompleted = 0;
+ for (const IoHash& ChunkHash : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
+ CHECK(Cas.HaveChunk(ChunkHash));
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < GcChunkHashes.size())
+ {
+ Sleep(1);
+ }
+ }
+ }
+}
+
+#endif
+
+void
+compactcas_forcelink()
+{
+}
+
+} // namespace zen
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
new file mode 100644
index 000000000..b0c6699eb
--- /dev/null
+++ b/src/zenstore/compactcas.h
@@ -0,0 +1,95 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+#include <zenstore/blockstore.h>
+#include <zenstore/caslog.h>
+#include <zenstore/gc.h>
+
+#include "cas.h"
+
+#include <atomic>
+#include <limits>
+#include <unordered_map>
+
+namespace spdlog {
+class logger;
+}
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+#pragma pack(push)
+#pragma pack(1)
+
+struct CasDiskIndexEntry
+{
+ static const uint8_t kTombstone = 0x01;
+
+ IoHash Key;
+ BlockStoreDiskLocation Location;
+ ZenContentType ContentType = ZenContentType::kUnknownContentType;
+ uint8_t Flags = 0;
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(CasDiskIndexEntry) == 32);
+
+/** This implements a storage strategy for small CAS values
+ *
+ * New chunks are simply appended to a small object file, and an index is
+ * maintained to allow chunks to be looked up within the active small object
+ * files
+ *
+ */
+
+struct CasContainerStrategy final : public GcStorage
+{
+ CasContainerStrategy(GcManager& Gc);
+ ~CasContainerStrategy();
+
+ CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
+ IoBuffer FindChunk(const IoHash& ChunkHash);
+ bool HaveChunk(const IoHash& ChunkHash);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Initialize(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ uint32_t MaxBlockSize,
+ uint64_t Alignment,
+ bool IsNewStore);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ virtual void CollectGarbage(GcContext& GcCtx) override;
+ virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_BlockStore.TotalSize()}; }
+
+private:
+ CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
+ void MakeIndexSnapshot();
+ uint64_t ReadIndexFile();
+ uint64_t ReadLog(uint64_t SkipEntryCount);
+ void OpenContainer(bool IsNewStore);
+
+ spdlog::logger& Log() { return m_Log; }
+
+ std::filesystem::path m_RootDirectory;
+ spdlog::logger& m_Log;
+ uint64_t m_PayloadAlignment = 1u << 4;
+ uint64_t m_MaxBlockSize = 1u << 28;
+ bool m_IsInitialized = false;
+ TCasLogFile<CasDiskIndexEntry> m_CasLog;
+ uint64_t m_LogFlushPosition = 0;
+ std::string m_ContainerBaseName;
+ std::filesystem::path m_BlocksBasePath;
+ BlockStore m_BlockStore;
+
+ RwLock m_LocationMapLock;
+ typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t;
+ LocationMap_t m_LocationMap;
+};
+
+void compactcas_forcelink();
+
+} // namespace zen
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
new file mode 100644
index 000000000..1d25920c4
--- /dev/null
+++ b/src/zenstore/filecas.cpp
@@ -0,0 +1,1452 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "filecas.h"
+
+#include <zencore/compress.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/scopeguard.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/uid.h>
+#include <zenstore/gc.h>
+#include <zenstore/scrubcontext.h>
+#include <zenutil/basicfile.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+#endif
+
+#include <gsl/gsl-lite.hpp>
+
+#include <barrier>
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <xxhash.h>
+#if ZEN_PLATFORM_WINDOWS
+# include <atlfile.h>
+#endif
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+namespace filecas::impl {
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".ulog";
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir)
+ {
+ return RootDir / fmt::format("cas.tmp{}", IndexExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); }
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct FileCasIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t Reserved = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const FileCasIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+ static_assert(sizeof(FileCasIndexHeader) == 32);
+
+#pragma pack(pop)
+
+} // namespace filecas::impl
+
+FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash)
+{
+ ShardedPath.Append(RootPath.c_str());
+
+ ExtendableStringBuilder<64> HashString;
+ ChunkHash.ToHexString(HashString);
+
+ const char* str = HashString.c_str();
+
+ // Shard into a path with two directory levels containing 12 bits and 8 bits
+ // respectively.
+ //
+ // This results in a maximum of 4096 * 256 directories
+ //
+ // The numbers have been chosen somewhat arbitrarily but are large to scale
+ // to very large chunk repositories without creating too many directories
+ // on a single level since NTFS does not deal very well with this.
+ //
+ // It may or may not make sense to make this a configurable policy, and it
+ // would probably be a good idea to measure performance for different
+ // policies and chunk counts
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str, str + 3);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 3, str + 5);
+ Shard2len = ShardedPath.Size();
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 5, str + 40);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas"))
+{
+}
+
+FileCasStrategy::~FileCasStrategy()
+{
+}
+
+void
+FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore)
+{
+ using namespace filecas::impl;
+
+ m_IsInitialized = true;
+
+ m_RootDirectory = RootDirectory;
+
+ m_Index.clear();
+
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
+
+ if (IsNewStore)
+ {
+ std::filesystem::remove(LogPath);
+ std::filesystem::remove(IndexPath);
+
+ if (std::filesystem::is_directory(m_RootDirectory))
+ {
+ // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside
+ // in this folder as well
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override
+ {
+ // We don't care about files
+ }
+ static bool IsHexChar(std::filesystem::path::value_type C)
+ {
+ return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16];
+ }
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& DirectoryName) override
+ {
+ if (DirectoryName.length() == 3)
+ {
+ if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2]))
+ {
+ ShardedRoots.push_back(Parent / DirectoryName);
+ }
+ }
+ return false;
+ }
+ std::vector<std::filesystem::path> ShardedRoots;
+ } CasVisitor;
+
+ FileSystemTraversal Traversal;
+ Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor);
+ for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots)
+ {
+ std::filesystem::remove_all(SharededRoot);
+ }
+ }
+ }
+
+ m_LogFlushPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
+ for (const auto& Entry : m_Index)
+ {
+ m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed);
+ }
+
+ CreateDirectories(m_RootDirectory);
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ if (IsNewStore || LogEntryCount > 0)
+ {
+ MakeIndexSnapshot();
+ }
+}
+
+#if ZEN_PLATFORM_WINDOWS
+static void
+DeletePayloadFileOnClose(const void* FileHandle)
+{
+ const HANDLE WinFileHandle = (const HANDLE)FileHandle;
+ // This will cause the file to be deleted when the last handle to it is closed
+ FILE_DISPOSITION_INFO Fdi{};
+ Fdi.DeleteFile = TRUE;
+ BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
+
+ if (!Success)
+ {
+ // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed
+ // to delete it.
+ ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'",
+ PathFromHandle(WinFileHandle),
+ GetLastErrorAsString());
+ }
+}
+#endif
+
+CasStore::InsertResult
+FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+
+ if (Mode == CasStore::InsertMode::kCopyOnly)
+ {
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (m_Index.contains(ChunkHash))
+ {
+ return CasStore::InsertResult{.New = false};
+ }
+ }
+ return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
+ }
+
+ // File-based chunks have special case handling whereby we move the file into
+ // place in the file store directory, thus avoiding unnecessary copying
+
+ IoBufferFileReference FileRef;
+ if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
+ {
+ {
+ bool Exists = true;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Exists = m_Index.contains(ChunkHash);
+ }
+ if (Exists)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ DeletePayloadFileOnClose(FileRef.FileHandle);
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle);
+ if (unlink(FilePath.c_str()) < 0)
+ {
+ int UnlinkError = zen::GetLastError();
+ if (UnlinkError != ENOENT)
+ {
+ ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
+ FilePath.string(),
+ GetSystemErrorAsString(UnlinkError));
+ }
+ }
+#endif
+ return CasStore::InsertResult{.New = false};
+ }
+ }
+
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+
+ RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
+
+#if ZEN_PLATFORM_WINDOWS
+ const HANDLE ChunkFileHandle = FileRef.FileHandle;
+ // See if file already exists
+ {
+ CAtlFile PayloadFile;
+
+ if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
+ {
+ // If we succeeded in opening the target file then we don't need to do anything else because it already exists
+ // and should contain the content we were about to insert
+
+ // We do need to ensure the source file goes away on close, however
+ size_t ChunkSize = Chunk.GetSize();
+ uint64_t FileSize = 0;
+ if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
+ {
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+
+ DeletePayloadFileOnClose(ChunkFileHandle);
+
+ return CasStore::InsertResult{.New = IsNew};
+ }
+ else
+ {
+ ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
+ Name.ShardedPath.ToUtf8(),
+ ChunkSize,
+ FileSize);
+ }
+ }
+ else
+ {
+ if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
+ {
+ // Shard directory does not exist
+ }
+ else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND))
+ {
+ // Shard directory exists, but not the file
+ }
+ else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION))
+ {
+ // Sharing violation, likely because we are trying to open a file
+ // which has been renamed on another thread, and the file handle
+ // used to rename it is still open. We handle this case below
+ // instead of here
+ }
+ else
+ {
+ ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes);
+ }
+ }
+ }
+
+ std::filesystem::path FullPath(Name.ShardedPath.c_str());
+
+ std::filesystem::path FilePath = FullPath.parent_path();
+ std::wstring FileName = FullPath.native();
+
+ const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR));
+ FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize));
+ memset(RenameInfo, 0, BufferSize);
+
+ RenameInfo->ReplaceIfExists = FALSE;
+ RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size());
+ memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR));
+ RenameInfo->FileName[FileName.size()] = 0;
+
+ auto $ = MakeGuard([&] { Memory::Free(RenameInfo); });
+
+ // Try to move file into place
+ BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
+
+ if (!Success)
+ {
+ // The rename/move could fail because the target directory does not yet exist. This code attempts
+ // to create it
+
+ CAtlFile DirHandle;
+
+ auto InternalCreateDirectoryHandle = [&] {
+ return DirHandle.Create(FilePath.c_str(),
+ GENERIC_READ | GENERIC_WRITE,
+ FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
+ OPEN_EXISTING,
+ FILE_FLAG_BACKUP_SEMANTICS);
+ };
+
+ // It's possible for several threads to enter this logic trying to create the same
+ // directory. Only one will create the directory of course, but all threads will
+ // make it through okay
+
+ HRESULT hRes = InternalCreateDirectoryHandle();
+
+ if (FAILED(hRes))
+ {
+ // TODO: we can handle directory creation more intelligently and efficiently than
+ // this currently does
+
+ CreateDirectories(FilePath.c_str());
+
+ hRes = InternalCreateDirectoryHandle();
+ }
+
+ if (FAILED(hRes))
+ {
+ ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath));
+ }
+
+ // Retry rename/move
+
+ Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
+ }
+
+ if (Success)
+ {
+ m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+
+ return CasStore::InsertResult{.New = IsNew};
+ }
+
+ const DWORD LastError = GetLastError();
+
+ if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
+ {
+ HashLock.ReleaseNow();
+ DeletePayloadFileOnClose(ChunkFileHandle);
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+
+ return CasStore::InsertResult{.New = IsNew};
+ }
+
+ ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
+ GetSystemErrorAsString(LastError),
+ ChunkHash);
+
+ DeletePayloadFileOnClose(ChunkFileHandle);
+
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle);
+ std::filesystem::path DestPath = Name.ShardedPath.c_str();
+ int Ret = link(SourcePath.c_str(), DestPath.c_str());
+ if (Ret < 0 && zen::GetLastError() == ENOENT)
+ {
+ // Destination directory doesn't exist. Create it any try again.
+ CreateDirectories(DestPath.parent_path().c_str());
+ Ret = link(SourcePath.c_str(), DestPath.c_str());
+ }
+ int LinkError = zen::GetLastError();
+
+ if (unlink(SourcePath.c_str()) < 0)
+ {
+ int UnlinkError = zen::GetLastError();
+ if (UnlinkError != ENOENT)
+ {
+ ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
+ SourcePath.string(),
+ GetSystemErrorAsString(UnlinkError));
+ }
+ }
+
+ // It is possible that someone beat us to it in linking the file. In that
+ // case a "file exists" error is okay. All others are not.
+ if (Ret < 0)
+ {
+ if (LinkError == EEXIST)
+ {
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+
+ ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
+ GetSystemErrorAsString(LinkError),
+ ChunkHash);
+ }
+ else
+ {
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+#endif // ZEN_PLATFORM_*
+ }
+
+ return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
+}
+
+CasStore::InsertResult
+FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (m_Index.contains(ChunkHash))
+ {
+ return {.New = false};
+ }
+ }
+
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+
+ // See if file already exists
+
+#if ZEN_PLATFORM_WINDOWS
+ CAtlFile PayloadFile;
+
+ HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+
+ if (SUCCEEDED(hRes))
+ {
+ // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the
+ // content we were about to insert
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+
+ PayloadFile.Close();
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ if (access(Name.ShardedPath.c_str(), F_OK) == 0)
+ {
+ return CasStore::InsertResult{.New = false};
+ }
+#endif
+
+ RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
+
+#if ZEN_PLATFORM_WINDOWS
+ // For now, use double-checked locking to see if someone else was first
+
+ hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+
+ if (SUCCEEDED(hRes))
+ {
+ uint64_t FileSize = 0;
+ if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
+ {
+ // If we succeeded in opening the file then and the size is correct we don't need to do anything
+ // else because someone else managed to create the file before we did. Just return.
+
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+ else
+ {
+ ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
+ Name.ShardedPath.ToUtf8(),
+ ChunkSize,
+ FileSize);
+ }
+ }
+
+ if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)))
+ {
+ ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes));
+ }
+
+ auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
+
+ hRes = InternalCreateFile();
+
+ if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
+ {
+ // Ensure parent directories exist and retry file creation
+ CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len));
+ hRes = InternalCreateFile();
+ }
+
+ if (FAILED(hRes))
+ {
+ ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8()));
+ }
+#else
+ // Attempt to exclusively create the file.
+ auto InternalCreateFile = [&] {
+ int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666);
+ if (Fd >= 0)
+ {
+ fchmod(Fd, 0666);
+ }
+ return Fd;
+ };
+ int Fd = InternalCreateFile();
+ if (Fd < 0)
+ {
+ switch (zen::GetLastError())
+ {
+ case EEXIST:
+ // Another thread has beat us to it so we're golden.
+ {
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return {.New = IsNew};
+ }
+ break;
+
+ case ENOENT:
+ if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len)))
+ {
+ Fd = InternalCreateFile();
+ if (Fd >= 0)
+ {
+ break;
+ }
+ }
+ ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath));
+
+ default:
+ ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8()));
+ }
+ }
+
+ struct FdWrapper
+ {
+ ~FdWrapper() { Close(); }
+ void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); }
+ void Close()
+ {
+ if (Fd >= 0)
+ {
+ close(Fd);
+ Fd = -1;
+ }
+ }
+ int Fd;
+ } PayloadFile = {Fd};
+#endif // ZEN_PLATFORM_WINDOWS
+
+ size_t ChunkRemain = ChunkSize;
+ auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData);
+
+ while (ChunkRemain != 0)
+ {
+ uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain));
+
+ PayloadFile.Write(ChunkCursor, ByteCount);
+
+ ChunkCursor += ByteCount;
+ ChunkRemain -= ByteCount;
+ }
+
+ // We cannot rely on RAII to close the file handle since it would be closed
+ // *after* the lock is released due to the initialization order
+ PayloadFile.Close();
+
+ m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
+
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+
+ return {.New = IsNew};
+}
+
+IoBuffer
+FileCasStrategy::FindChunk(const IoHash& ChunkHash)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (!m_Index.contains(ChunkHash))
+ {
+ return {};
+ }
+ }
+
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+
+ RwLock::SharedLockScope _(LockForHash(ChunkHash));
+
+ return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+}
+
+bool
+FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ RwLock::SharedLockScope _(m_Lock);
+ return m_Index.contains(ChunkHash);
+}
+
+void
+FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
+{
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+
+ uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec));
+ if (Ec)
+ {
+ ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8());
+ FileSize = 0;
+ }
+
+ ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize));
+ std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+
+ if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str()))
+ {
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
+ {
+ m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed);
+ m_Index.erase(It);
+ }
+ }
+ m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize});
+ }
+}
+
+void
+FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ // NOTE: it's not a problem now, but in the future if a GC should happen while this
+ // is in flight, the result could be wrong since chunks could go away in the meantime.
+ //
+ // It would be good to have a pinning mechanism to make this less likely but
+ // given that chunks could go away at any point after the results are returned to
+ // a caller, this is something which needs to be taken into account by anyone consuming
+ // this functionality in any case
+
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
+}
+
+void
+FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ RwLock::SharedLockScope _(m_Lock);
+ for (const auto& It : m_Index)
+ {
+ const IoHash& NameHash = It.first;
+ ShardingHelper Name(m_RootDirectory.c_str(), NameHash);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ Callback(NameHash, std::move(Payload));
+ }
+}
+
+void
+FileCasStrategy::Flush()
+{
+ // Since we don't keep files open after writing there's nothing specific
+ // to flush here.
+ //
+ // Depending on what semantics we want Flush() to provide, it could be
+ // argued that this should just flush the volume which we are using to
+ // store the CAS files on here, to ensure metadata is flushed along
+ // with file data
+ //
+ // Related: to facilitate more targeted validation during recovery we could
+ // maintain a log of when chunks were created
+}
+
+void
+FileCasStrategy::Scrub(ScrubContext& Ctx)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ std::vector<IoHash> BadHashes;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
+
+ {
+ std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
+ RwLock::ExclusiveLockScope _(m_Lock);
+ for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
+ {
+ if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed);
+ m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size});
+ }
+ }
+ }
+
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ if (!Payload)
+ {
+ BadHashes.push_back(Hash);
+ return;
+ }
+ ++ChunkCount;
+ ChunkBytes += Payload.GetSize();
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize))
+ {
+ if (RawHash != Hash)
+ {
+ // Hash mismatch
+ BadHashes.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
+ IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload))));
+ if (ComputedHash == Hash)
+ {
+ return;
+ }
+#endif
+ BadHashes.push_back(Hash);
+ });
+
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+
+ if (!BadHashes.empty())
+ {
+ ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size());
+
+ if (Ctx.RunRecovery())
+ {
+ ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size());
+
+ for (const IoHash& Hash : BadHashes)
+ {
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("failed to delete file for chunk {}", Hash);
+ }
+ }
+ }
+ }
+
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCidChunks(BadHashes);
+
+ ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+}
+
+void
+FileCasStrategy::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ ZEN_DEBUG("collecting garbage from {}", m_RootDirectory);
+
+ std::vector<IoHash> ChunksToDelete;
+ std::atomic<uint64_t> ChunksToDeleteBytes{0};
+ std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
+
+ std::vector<IoHash> CandidateCas;
+ CandidateCas.resize(1);
+
+ uint64_t DeletedCount = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
+ m_RootDirectory,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ DeletedCount,
+ ChunkCount,
+ NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)),
+ NiceBytes(OldTotalSize));
+ });
+
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ bool KeepThis = false;
+ CandidateCas[0] = Hash;
+ GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
+ ZEN_UNUSED(Hash);
+ KeepThis = true;
+ });
+
+ const uint64_t FileSize = Payload.GetSize();
+
+ if (!KeepThis)
+ {
+ ChunksToDelete.push_back(Hash);
+ ChunksToDeleteBytes.fetch_add(FileSize);
+ }
+
+ ++ChunkCount;
+ ChunkBytes.fetch_add(FileSize);
+ });
+
+ // TODO, any entires we did not encounter during our IterateChunks should be removed from the index
+
+ if (ChunksToDelete.empty())
+ {
+ ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory);
+ return;
+ }
+
+ ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})",
+ m_RootDirectory,
+ ChunksToDelete.size(),
+ ChunkCount.load(),
+ NiceBytes(ChunksToDeleteBytes));
+
+ if (GcCtx.IsDeletionMode() == false)
+ {
+ ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled");
+
+ return;
+ }
+
+ for (const IoHash& Hash : ChunksToDelete)
+ {
+ ZEN_TRACE("deleting chunk {}", Hash);
+
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message());
+ continue;
+ }
+ DeletedCount++;
+ }
+
+ GcCtx.AddDeletedCids(ChunksToDelete);
+}
+
+bool
+FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason)
+{
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Flags & (~FileCasIndexEntry::kTombStone))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Size;
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+}
+
+void
+FileCasStrategy::MakeIndexSnapshot()
+{
+ using namespace filecas::impl;
+
+ uint64_t LogCount = m_CasLog.GetLogCount();
+ if (m_LogFlushPosition == LogCount)
+ {
+ return;
+ }
+ ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
+ m_RootDirectory,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = GetIndexPath(m_RootDirectory);
+ fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory);
+
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, STmpIndexPath);
+ }
+
+ try
+ {
+ // Write the current state of the location map to a new index state
+ std::vector<FileCasIndexEntry> Entries;
+
+ {
+ Entries.resize(m_Index.size());
+
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_Index)
+ {
+ FileCasIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Size = Entry.second.Size;
+ }
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount};
+
+ Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ m_LogFlushPosition = LogCount;
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(STmpIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+}
+uint64_t
+FileCasStrategy::ReadIndexFile()
+{
+ using namespace filecas::impl;
+
+ std::vector<FileCasIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}",
+ IndexPath,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(FileCasIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry);
+ FileCasIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) &&
+ (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount))
+ {
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader));
+
+ std::string InvalidEntryReason;
+ for (const FileCasIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
+ }
+
+ return Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ }
+ }
+ return 0;
+ }
+
+ if (std::filesystem::is_directory(m_RootDirectory))
+ {
+ ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory);
+ TCasLogFile<FileCasIndexEntry> CasLog;
+ uint64_t TotalSize = 0;
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}",
+ m_RootDirectory,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ CasLog.GetLogCount(),
+ NiceBytes(TotalSize));
+ });
+
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+
+ std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
+ CasLog.Open(LogPath, CasLogFile::Mode::kTruncate);
+ std::string InvalidEntryReason;
+ for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
+ CasLog.Append(Entry);
+ }
+
+ CasLog.Close();
+ }
+
+ return 0;
+}
+
+uint64_t
+FileCasStrategy::ReadLog(uint64_t SkipEntryCount)
+{
+ using namespace filecas::impl;
+
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<FileCasIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ LogEntryCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(LogEntryCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const FileCasIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & FileCasIndexEntry::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size});
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath);
+ }
+ return LogEntryCount;
+ }
+ }
+ return 0;
+}
+
+std::vector<FileCasStrategy::FileCasIndexEntry>
+FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir)
+{
+ using namespace filecas::impl;
+
+ std::vector<FileCasIndexEntry> Entries;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {}
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
+ {
+ std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory);
+
+ std::filesystem::path::string_type PathString = RelPath.native();
+
+ if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2)))
+ {
+ if (PathString.at(3) == std::filesystem::path::preferred_separator)
+ {
+ PathString.erase(3, 1);
+ }
+ PathString.append(File);
+
+ // TODO: should validate that we're actually dealing with a valid hex string here
+#if ZEN_PLATFORM_WINDOWS
+ StringBuilder<64> Utf8;
+ WideToUtf8(PathString, Utf8);
+ IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()});
+#else
+ IoHash NameHash = IoHash::FromHexString(PathString);
+#endif
+ Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize});
+ }
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& DirectoryName) override
+ {
+ return true;
+ }
+
+ const std::filesystem::path& RootDirectory;
+ std::vector<FileCasIndexEntry>& Entries;
+ } CasVisitor{RootDir, Entries};
+
+ FileSystemTraversal Traversal;
+ Traversal.TraverseFileSystem(RootDir, CasVisitor);
+ return Entries;
+};
+
+ //////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("cas.file.move")
+{
+ // specifying an absolute path here can be helpful when using procmon to dig into things
+ ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
+
+ GcManager Gc;
+
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
+
+ {
+ std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
+
+ IoBuffer ZeroBytes{1024 * 1024};
+ IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes);
+
+ BasicFile PayloadFile;
+ PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate);
+ PayloadFile.Write(ZeroBytes, 0);
+ PayloadFile.Close();
+
+ IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path);
+
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash);
+ CHECK_EQ(Result.New, true);
+ }
+
+# if 0
+ SUBCASE("stresstest")
+ {
+ std::vector<IoHash> PayloadHashes;
+
+ const int kWorkers = 64;
+ const int kItemCount = 128;
+
+ for (int w = 0; w < kWorkers; ++w)
+ {
+ for (int i = 0; i < kItemCount; ++i)
+ {
+ IoBuffer Payload{1024};
+ *reinterpret_cast<int*>(Payload.MutableData()) = i;
+ PayloadHashes.push_back(IoHash::HashBuffer(Payload));
+
+ std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)};
+ WriteFile(PayloadPath, Payload);
+ }
+ }
+
+ std::barrier Sync{kWorkers};
+
+ auto PopulateAll = [&](int w) {
+ std::vector<IoBuffer> Buffers;
+
+ for (int i = 0; i < kItemCount; ++i)
+ {
+ std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)};
+ IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath);
+ Buffers.push_back(Payload);
+ Sync.arrive_and_wait();
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]);
+ }
+ };
+
+ std::vector<std::jthread> Threads;
+
+ for (int i = 0; i < kWorkers; ++i)
+ {
+ Threads.push_back(std::jthread(PopulateAll, i));
+ }
+
+ for (std::jthread& Thread : Threads)
+ {
+ Thread.join();
+ }
+ }
+# endif
+}
+
+TEST_CASE("cas.file.gc")
+{
+ // specifying an absolute path here can be helpful when using procmon to dig into things
+ ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
+
+ GcManager Gc;
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
+
+ const int kIterationCount = 1000;
+ std::vector<IoHash> Keys{kIterationCount};
+
+ auto InsertChunks = [&] {
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << i;
+ CbObject Obj = Cbo.Save();
+
+ IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
+ IoHash Hash = HashBuffer(ObjBuffer);
+
+ FileCas.InsertChunk(ObjBuffer, Hash);
+
+ Keys[i] = Hash;
+ }
+ };
+
+ // Drop everything
+
+ {
+ InsertChunks();
+
+ GcContext Ctx(GcClock::Now() - std::chrono::hours(24));
+ FileCas.CollectGarbage(Ctx);
+
+ for (const IoHash& Key : Keys)
+ {
+ IoBuffer Chunk = FileCas.FindChunk(Key);
+
+ CHECK(!Chunk);
+ }
+ }
+
+ // Keep roughly half of the chunks
+
+ {
+ InsertChunks();
+
+ GcContext Ctx(GcClock::Now() - std::chrono::hours(24));
+
+ for (const IoHash& Key : Keys)
+ {
+ if (Key.Hash[0] & 1)
+ {
+ Ctx.AddRetainedCids(std::vector<IoHash>{Key});
+ }
+ }
+
+ FileCas.CollectGarbage(Ctx);
+
+ for (const IoHash& Key : Keys)
+ {
+ if (Key.Hash[0] & 1)
+ {
+ CHECK(FileCas.FindChunk(Key));
+ }
+ else
+ {
+ CHECK(!FileCas.FindChunk(Key));
+ }
+ }
+ }
+}
+
+#endif
+
+void
+filecas_forcelink()
+{
+}
+
+} // namespace zen
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
new file mode 100644
index 000000000..420b3a634
--- /dev/null
+++ b/src/zenstore/filecas.h
@@ -0,0 +1,102 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#include <zencore/filesystem.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/thread.h>
+#include <zenstore/caslog.h>
+#include <zenstore/gc.h>
+
+#include "cas.h"
+
+#include <atomic>
+#include <functional>
+
+namespace spdlog {
+class logger;
+}
+
+namespace zen {
+
+class BasicFile;
+
+/** CAS storage strategy using a file-per-chunk storage strategy
+ */
+
+struct FileCasStrategy final : public GcStorage
+{
+ FileCasStrategy(GcManager& Gc);
+ ~FileCasStrategy();
+
+ void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore);
+ CasStore::InsertResult InsertChunk(IoBuffer Chunk,
+ const IoHash& ChunkHash,
+ CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace);
+ IoBuffer FindChunk(const IoHash& ChunkHash);
+ bool HaveChunk(const IoHash& ChunkHash);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ virtual void CollectGarbage(GcContext& GcCtx) override;
+ virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; }
+
+private:
+ void MakeIndexSnapshot();
+ uint64_t ReadIndexFile();
+ uint64_t ReadLog(uint64_t LogPosition);
+
+ struct IndexEntry
+ {
+ uint64_t Size = 0;
+ };
+ using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>;
+
+ CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
+
+ std::filesystem::path m_RootDirectory;
+ RwLock m_Lock;
+ IndexMap m_Index;
+ RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
+ std::atomic_uint64_t m_TotalSize{};
+ bool m_IsInitialized = false;
+
+ struct FileCasIndexEntry
+ {
+ static const uint32_t kTombStone = 0x0000'0001;
+
+ bool IsFlagSet(const uint32_t Flag) const { return (Flags & kTombStone) == Flag; }
+
+ IoHash Key;
+ uint32_t Flags = 0;
+ uint64_t Size = 0;
+ };
+ static bool ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason);
+ static std::vector<FileCasStrategy::FileCasIndexEntry> ScanFolderForCasFiles(const std::filesystem::path& RootDir);
+
+ static_assert(sizeof(FileCasIndexEntry) == 32);
+
+ TCasLogFile<FileCasIndexEntry> m_CasLog;
+ uint64_t m_LogFlushPosition = 0;
+
+ inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
+ void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback);
+ void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
+
+ struct ShardingHelper
+ {
+ ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash);
+
+ size_t Shard2len = 0;
+ ExtendablePathBuilder<128> ShardedPath;
+ };
+};
+
+void filecas_forcelink();
+
+} // namespace zen
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
new file mode 100644
index 000000000..370c3c965
--- /dev/null
+++ b/src/zenstore/gc.cpp
@@ -0,0 +1,1312 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/gc.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+#include <zencore/timer.h>
+#include <zenstore/cidstore.h>
+
+#include "cas.h"
+
+#include <fmt/format.h>
+#include <filesystem>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#else
+# include <fcntl.h>
+# include <sys/file.h>
+# include <sys/stat.h>
+# include <unistd.h>
+#endif
+
+#if ZEN_WITH_TESTS
+# include <zencore/compress.h>
+# include <algorithm>
+# include <random>
+#endif
+
+template<>
+struct fmt::formatter<zen::GcClock::TimePoint> : formatter<string_view>
+{
+ template<typename FormatContext>
+ auto format(const zen::GcClock::TimePoint& TimePoint, FormatContext& ctx)
+ {
+ std::time_t Time = std::chrono::system_clock::to_time_t(TimePoint);
+ zen::ExtendableStringBuilder<32> String;
+ String << std::ctime(&Time);
+ return formatter<string_view>::format(String.ToView(), ctx);
+ }
+};
+
+namespace zen {
+
+using namespace std::literals;
+namespace fs = std::filesystem;
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace {
+ std::error_code CreateGCReserve(const std::filesystem::path& Path, uint64_t Size)
+ {
+ if (Size == 0)
+ {
+ std::filesystem::remove(Path);
+ return std::error_code{};
+ }
+ CreateDirectories(Path.parent_path());
+ if (std::filesystem::is_regular_file(Path) && std::filesystem::file_size(Path) == Size)
+ {
+ return std::error_code();
+ }
+#if ZEN_PLATFORM_WINDOWS
+ DWORD dwCreationDisposition = CREATE_ALWAYS;
+ DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
+
+ const DWORD dwShareMode = 0;
+ const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
+ HANDLE hTemplateFile = nullptr;
+
+ HANDLE FileHandle = CreateFile(Path.c_str(),
+ dwDesiredAccess,
+ dwShareMode,
+ /* lpSecurityAttributes */ nullptr,
+ dwCreationDisposition,
+ dwFlagsAndAttributes,
+ hTemplateFile);
+
+ if (FileHandle == INVALID_HANDLE_VALUE)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ bool Keep = true;
+ auto _ = MakeGuard([&]() {
+ ::CloseHandle(FileHandle);
+ if (!Keep)
+ {
+ ::DeleteFile(Path.c_str());
+ }
+ });
+ LARGE_INTEGER liFileSize;
+ liFileSize.QuadPart = Size;
+ BOOL OK = ::SetFilePointerEx(FileHandle, liFileSize, 0, FILE_BEGIN);
+ if (!OK)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ OK = ::SetEndOfFile(FileHandle);
+ if (!OK)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ Keep = true;
+#else
+ int OpenFlags = O_CLOEXEC | O_RDWR | O_CREAT;
+ int Fd = open(Path.c_str(), OpenFlags, 0666);
+ if (Fd < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+
+ bool Keep = true;
+ auto _ = MakeGuard([&]() {
+ close(Fd);
+ if (!Keep)
+ {
+ unlink(Path.c_str());
+ }
+ });
+
+ if (fchmod(Fd, 0666) < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+
+# if ZEN_PLATFORM_MAC
+ if (ftruncate(Fd, (off_t)Size) < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+# else
+ if (ftruncate64(Fd, (off64_t)Size) < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ int Error = posix_fallocate64(Fd, 0, (off64_t)Size);
+ if (Error)
+ {
+ return MakeErrorCode(Error);
+ }
+# endif
+ Keep = true;
+#endif
+ return std::error_code{};
+ }
+
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
+CbObject
+LoadCompactBinaryObject(const fs::path& Path)
+{
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
+ {
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return LoadCompactBinaryObject(Buffer);
+ }
+ }
+
+ return CbObject();
+}
+
+void
+SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
+{
+ WriteFile(Path, Object.GetBuffer().AsIoBuffer());
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+struct GcContext::GcState
+{
+ using CacheKeyContexts = std::unordered_map<std::string, std::vector<IoHash>>;
+
+ CacheKeyContexts m_ExpiredCacheKeys;
+ HashKeySet m_RetainedCids;
+ HashKeySet m_DeletedCids;
+ GcClock::TimePoint m_ExpireTime;
+ bool m_DeletionMode = true;
+ bool m_CollectSmallObjects = false;
+
+ std::filesystem::path DiskReservePath;
+};
+
+GcContext::GcContext(const GcClock::TimePoint& ExpireTime) : m_State(std::make_unique<GcState>())
+{
+ m_State->m_ExpireTime = ExpireTime;
+}
+
+GcContext::~GcContext()
+{
+}
+
+void
+GcContext::AddRetainedCids(std::span<const IoHash> Cids)
+{
+ m_State->m_RetainedCids.AddHashesToSet(Cids);
+}
+
+void
+GcContext::SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys)
+{
+ m_State->m_ExpiredCacheKeys[CacheKeyContext] = std::move(ExpiredKeys);
+}
+
+void
+GcContext::IterateCids(std::function<void(const IoHash&)> Callback)
+{
+ m_State->m_RetainedCids.IterateHashes([&](const IoHash& Hash) { Callback(Hash); });
+}
+
+void
+GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc)
+{
+ m_State->m_RetainedCids.FilterHashes(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
+}
+
+void
+GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc)
+{
+ m_State->m_RetainedCids.FilterHashes(Cid, std::move(FilterFunc));
+}
+
+void
+GcContext::AddDeletedCids(std::span<const IoHash> Cas)
+{
+ m_State->m_DeletedCids.AddHashesToSet(Cas);
+}
+
+const HashKeySet&
+GcContext::DeletedCids()
+{
+ return m_State->m_DeletedCids;
+}
+
+std::span<const IoHash>
+GcContext::ExpiredCacheKeys(const std::string& CacheKeyContext) const
+{
+ return m_State->m_ExpiredCacheKeys[CacheKeyContext];
+}
+
+bool
+GcContext::IsDeletionMode() const
+{
+ return m_State->m_DeletionMode;
+}
+
+void
+GcContext::SetDeletionMode(bool NewState)
+{
+ m_State->m_DeletionMode = NewState;
+}
+
+bool
+GcContext::CollectSmallObjects() const
+{
+ return m_State->m_CollectSmallObjects;
+}
+
+void
+GcContext::CollectSmallObjects(bool NewState)
+{
+ m_State->m_CollectSmallObjects = NewState;
+}
+
+GcClock::TimePoint
+GcContext::ExpireTime() const
+{
+ return m_State->m_ExpireTime;
+}
+
+void
+GcContext::DiskReservePath(const std::filesystem::path& Path)
+{
+ m_State->DiskReservePath = Path;
+}
+
+uint64_t
+GcContext::ClaimGCReserve()
+{
+ if (!std::filesystem::is_regular_file(m_State->DiskReservePath))
+ {
+ return 0;
+ }
+ uint64_t ReclaimedSize = std::filesystem::file_size(m_State->DiskReservePath);
+ if (std::filesystem::remove(m_State->DiskReservePath))
+ {
+ return ReclaimedSize;
+ }
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+GcContributor::GcContributor(GcManager& Gc) : m_Gc(Gc)
+{
+ m_Gc.AddGcContributor(this);
+}
+
+GcContributor::~GcContributor()
+{
+ m_Gc.RemoveGcContributor(this);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+GcStorage::GcStorage(GcManager& Gc) : m_Gc(Gc)
+{
+ m_Gc.AddGcStorage(this);
+}
+
+GcStorage::~GcStorage()
+{
+ m_Gc.RemoveGcStorage(this);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+GcManager::GcManager() : m_Log(logging::Get("gc"))
+{
+}
+
+GcManager::~GcManager()
+{
+}
+
+void
+GcManager::AddGcContributor(GcContributor* Contributor)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcContribs.push_back(Contributor);
+}
+
+void
+GcManager::RemoveGcContributor(GcContributor* Contributor)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; });
+}
+
+void
+GcManager::AddGcStorage(GcStorage* Storage)
+{
+ ZEN_ASSERT(Storage != nullptr);
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcStorage.push_back(Storage);
+}
+
+void
+GcManager::RemoveGcStorage(GcStorage* Storage)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; });
+}
+
+void
+GcManager::CollectGarbage(GcContext& GcCtx)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ // First gather reference set
+ {
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ for (GcContributor* Contributor : m_GcContribs)
+ {
+ Contributor->GatherReferences(GcCtx);
+ }
+ }
+
+ // Then trim storage
+ {
+ GcStorageSize GCTotalSizeDiff;
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_INFO("collected garbage in {}. Removed {} disk space, {} memory",
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ NiceBytes(GCTotalSizeDiff.DiskSize),
+ NiceBytes(GCTotalSizeDiff.MemorySize));
+ });
+ for (GcStorage* Storage : m_GcStorage)
+ {
+ const auto PreSize = Storage->StorageSize();
+ Storage->CollectGarbage(GcCtx);
+ const auto PostSize = Storage->StorageSize();
+ GCTotalSizeDiff.DiskSize += PreSize.DiskSize > PostSize.DiskSize ? PreSize.DiskSize - PostSize.DiskSize : 0;
+ GCTotalSizeDiff.MemorySize += PreSize.MemorySize > PostSize.MemorySize ? PreSize.MemorySize - PostSize.MemorySize : 0;
+ }
+ }
+}
+
+GcStorageSize
+GcManager::TotalStorageSize() const
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ GcStorageSize TotalSize;
+
+ for (GcStorage* Storage : m_GcStorage)
+ {
+ const auto Size = Storage->StorageSize();
+ TotalSize.DiskSize += Size.DiskSize;
+ TotalSize.MemorySize += Size.MemorySize;
+ }
+
+ return TotalSize;
+}
+
+#if ZEN_USE_REF_TRACKING
+void
+GcManager::OnNewCidReferences(std::span<IoHash> Hashes)
+{
+ ZEN_UNUSED(Hashes);
+}
+
+void
+GcManager::OnCommittedCidReferences(std::span<IoHash> Hashes)
+{
+ ZEN_UNUSED(Hashes);
+}
+
+void
+GcManager::OnDroppedCidReferences(std::span<IoHash> Hashes)
+{
+ ZEN_UNUSED(Hashes);
+}
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+void
+DiskUsageWindow::KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick)
+{
+ auto It = m_LogWindow.begin();
+ if (It == m_LogWindow.end())
+ {
+ return;
+ }
+ while (It->SampleTime < StartTick)
+ {
+ ++It;
+ if (It == m_LogWindow.end())
+ {
+ m_LogWindow.clear();
+ return;
+ }
+ }
+ m_LogWindow.erase(m_LogWindow.begin(), It);
+
+ It = m_LogWindow.begin();
+ while (It != m_LogWindow.end())
+ {
+ if (It->SampleTime >= EndTick)
+ {
+ m_LogWindow.erase(It, m_LogWindow.end());
+ return;
+ }
+ It++;
+ }
+}
+
+std::vector<uint64_t>
+DiskUsageWindow::GetDiskDeltas(GcClock::Tick StartTick, GcClock::Tick EndTick, GcClock::Tick DeltaWidth, uint64_t& OutMaxDelta) const
+{
+ ZEN_ASSERT(StartTick != -1);
+ ZEN_ASSERT(DeltaWidth > 0);
+
+ std::vector<uint64_t> Result;
+ Result.reserve((EndTick - StartTick + DeltaWidth - 1) / DeltaWidth);
+
+ size_t WindowSize = m_LogWindow.size();
+ GcClock::Tick FirstWindowTick = WindowSize < 2 ? EndTick : m_LogWindow[1].SampleTime;
+
+ GcClock::Tick RangeStart = StartTick;
+ while (FirstWindowTick >= RangeStart + DeltaWidth && RangeStart < EndTick)
+ {
+ Result.push_back(0);
+ RangeStart += DeltaWidth;
+ }
+
+ uint64_t DeltaSum = 0;
+ size_t WindowIndex = 1;
+ while (WindowIndex < WindowSize && RangeStart < EndTick)
+ {
+ const DiskUsageEntry& Entry = m_LogWindow[WindowIndex];
+ if (Entry.SampleTime < RangeStart)
+ {
+ ++WindowIndex;
+ continue;
+ }
+ GcClock::Tick RangeEnd = Min(EndTick, RangeStart + DeltaWidth);
+ ZEN_ASSERT(Entry.SampleTime >= RangeStart);
+ if (Entry.SampleTime >= RangeEnd)
+ {
+ Result.push_back(DeltaSum);
+ OutMaxDelta = Max(DeltaSum, OutMaxDelta);
+ DeltaSum = 0;
+ RangeStart = RangeEnd;
+ continue;
+ }
+ const DiskUsageEntry& PrevEntry = m_LogWindow[WindowIndex - 1];
+ if (Entry.DiskUsage > PrevEntry.DiskUsage)
+ {
+ uint64_t Delta = Entry.DiskUsage - PrevEntry.DiskUsage;
+ DeltaSum += Delta;
+ }
+ WindowIndex++;
+ }
+
+ while (RangeStart < EndTick)
+ {
+ Result.push_back(DeltaSum);
+ OutMaxDelta = Max(DeltaSum, OutMaxDelta);
+ DeltaSum = 0;
+ RangeStart += DeltaWidth;
+ }
+ return Result;
+}
+
+GcClock::Tick
+DiskUsageWindow::FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const
+{
+ ZEN_ASSERT(Amount > 0);
+ uint64_t RemainingToFind = Amount;
+ size_t Offset = 1;
+ while (Offset < m_LogWindow.size())
+ {
+ const DiskUsageEntry& Entry = m_LogWindow[Offset];
+ if (Entry.SampleTime >= EndTick)
+ {
+ return EndTick;
+ }
+ const DiskUsageEntry& PreviousEntry = m_LogWindow[Offset - 1];
+ uint64_t Delta = Entry.DiskUsage > PreviousEntry.DiskUsage ? Entry.DiskUsage - PreviousEntry.DiskUsage : 0;
+ if (Delta >= RemainingToFind)
+ {
+ return m_LogWindow[Offset].SampleTime + 1;
+ }
+ RemainingToFind -= Delta;
+ Offset++;
+ }
+ return EndTick;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager)
+{
+}
+
+GcScheduler::~GcScheduler()
+{
+ Shutdown();
+}
+
+void
+GcScheduler::Initialize(const GcSchedulerConfig& Config)
+{
+ using namespace std::chrono;
+
+ m_Config = Config;
+
+ if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval)
+ {
+ m_Config.Interval = m_Config.MonitorInterval;
+ }
+
+ std::filesystem::create_directories(Config.RootDirectory);
+
+ std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
+ if (Ec)
+ {
+ ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'",
+ m_Config.RootDirectory / "reserve.gc",
+ NiceBytes(m_Config.DiskReserveSize),
+ Ec.message());
+ }
+
+ m_LastGcTime = GcClock::Now();
+ m_LastGcExpireTime = GcClock::TimePoint::min();
+
+ if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state"))
+ {
+ m_LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64()));
+ m_LastGcExpireTime =
+ GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcExpireTime"].AsInt64(GcClock::Duration::min().count())));
+ if (m_LastGcTime + m_Config.Interval < GcClock::Now())
+ {
+ // TODO: Trigger GC?
+ m_LastGcTime = GcClock::Now();
+ }
+ }
+
+ m_DiskUsageLog.Open(m_Config.RootDirectory / "gc.dlog", CasLogFile::Mode::kWrite);
+ m_DiskUsageLog.Initialize();
+ const GcClock::Tick LastGCTick = m_LastGcTime.time_since_epoch().count();
+ m_DiskUsageLog.Replay(
+ [this, LastGCTick](const DiskUsageWindow::DiskUsageEntry& Entry) {
+ if (Entry.SampleTime >= m_LastGcExpireTime.time_since_epoch().count())
+ {
+ m_DiskUsageWindow.Append(Entry);
+ }
+ },
+ 0);
+
+ m_NextGcTime = NextGcTime(m_LastGcTime);
+ m_GcThread = std::thread(&GcScheduler::SchedulerThread, this);
+}
+
+void
+GcScheduler::Shutdown()
+{
+ if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
+ {
+ bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
+ m_GcSignal.notify_one();
+
+ if (m_GcThread.joinable())
+ {
+ if (GcIsRunning)
+ {
+ ZEN_INFO("Waiting for garbage collection to complete");
+ }
+ m_GcThread.join();
+ }
+ }
+ m_DiskUsageLog.Flush();
+ m_DiskUsageLog.Close();
+}
+
+bool
+GcScheduler::Trigger(const GcScheduler::TriggerParams& Params)
+{
+ if (m_Config.Enabled)
+ {
+ std::unique_lock Lock(m_GcMutex);
+ if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ {
+ m_TriggerParams = Params;
+ uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+ if (m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
+ {
+ m_GcSignal.notify_one();
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+void
+GcScheduler::SchedulerThread()
+{
+ std::chrono::seconds WaitTime{0};
+
+ for (;;)
+ {
+ bool Timeout = false;
+ {
+ ZEN_ASSERT(WaitTime.count() >= 0);
+ std::unique_lock Lock(m_GcMutex);
+ Timeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, WaitTime);
+ }
+
+ if (Status() == GcSchedulerStatus::kStopped)
+ {
+ break;
+ }
+
+ if (!m_Config.Enabled)
+ {
+ WaitTime = std::chrono::seconds::max();
+ continue;
+ }
+
+ if (!Timeout && Status() == GcSchedulerStatus::kIdle)
+ {
+ continue;
+ }
+
+ bool Delete = true;
+ bool CollectSmallObjects = m_Config.CollectSmallObjects;
+ std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration;
+ uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit;
+ GcClock::TimePoint Now = GcClock::Now();
+ if (m_TriggerParams)
+ {
+ const auto TriggerParams = m_TriggerParams.value();
+ m_TriggerParams.reset();
+
+ CollectSmallObjects = TriggerParams.CollectSmallObjects;
+ if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max())
+ {
+ MaxCacheDuration = TriggerParams.MaxCacheDuration;
+ }
+ if (TriggerParams.DiskSizeSoftLimit != 0)
+ {
+ DiskSizeSoftLimit = TriggerParams.DiskSizeSoftLimit;
+ }
+ }
+
+ GcClock::TimePoint ExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration;
+
+ std::error_code Ec;
+ const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
+
+ if (Timeout && Status() == GcSchedulerStatus::kIdle)
+ {
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message());
+ }
+
+ const int64_t PressureGraphLength = 30;
+ const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval;
+ std::vector<uint64_t> DiskDeltas;
+ uint64_t MaxLoad = 0;
+ {
+ const GcClock::Tick EpochTickCount = GcClock::Now().time_since_epoch().count();
+ std::unique_lock Lock(m_GcMutex);
+ m_DiskUsageWindow.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize});
+ m_DiskUsageLog.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize});
+ const GcClock::TimePoint LoadGraphStartTime = Now - LoadGraphTime;
+ GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count();
+ GcClock::Tick End = Now.time_since_epoch().count();
+ DiskDeltas = m_DiskUsageWindow.GetDiskDeltas(Start,
+ End,
+ Max(1, (End - Start + PressureGraphLength - 1) / PressureGraphLength),
+ MaxLoad);
+ }
+
+ std::string LoadGraph;
+ LoadGraph.resize(DiskDeltas.size(), '0');
+ if (DiskDeltas.size() > 0 && MaxLoad > 0)
+ {
+ char LoadIndicator[11] = "0123456789";
+ for (size_t Index = 0; Index < DiskDeltas.size(); ++Index)
+ {
+ size_t LoadIndex = (9 * DiskDeltas[Index] + MaxLoad - 1) / MaxLoad;
+ LoadGraph[Index] = LoadIndicator[LoadIndex];
+ }
+ }
+
+ uint64_t GcDiskSpaceGoal = 0;
+ if (DiskSizeSoftLimit != 0 && TotalSize.DiskSize > DiskSizeSoftLimit)
+ {
+ GcDiskSpaceGoal = TotalSize.DiskSize - DiskSizeSoftLimit;
+ std::unique_lock Lock(m_GcMutex);
+ GcClock::Tick AgeTick = m_DiskUsageWindow.FindTimepointThatRemoves(GcDiskSpaceGoal, Now.time_since_epoch().count());
+ GcClock::TimePoint SizeBasedExpireTime = GcClock::TimePointFromTick(AgeTick);
+ if (SizeBasedExpireTime > ExpireTime)
+ {
+ ExpireTime = SizeBasedExpireTime;
+ }
+ }
+
+ bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0;
+
+ std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now());
+
+ if (RemaingTime < std::chrono::seconds::zero())
+ {
+ RemaingTime = std::chrono::seconds::zero();
+ }
+
+ bool TimeBasedGCTriggered = !DiskSpaceGCTriggered && RemaingTime.count() == 0;
+ ZEN_INFO(
+ "{} in use,{} {} of total {} free disk space, disk writes last {} per {} [{}], peak {}/s. {}",
+ NiceBytes(TotalSize.DiskSize),
+ DiskSizeSoftLimit == 0 ? "" : fmt::format(" {} soft limit,", NiceBytes(DiskSizeSoftLimit)),
+ NiceBytes(Space.Free),
+ NiceBytes(Space.Total),
+ NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count())),
+ NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count() / PressureGraphLength)),
+ LoadGraph,
+ NiceBytes(MaxLoad * uint64_t(std::chrono::seconds(1).count()) / uint64_t(std::chrono::seconds(LoadGraphTime).count())),
+ DiskSpaceGCTriggered ? fmt::format("Disk use threshold triggered, trying to reclaim {}. ", NiceBytes(GcDiskSpaceGoal))
+ : TimeBasedGCTriggered ? "GC schedule triggered."
+ : m_NextGcTime == GcClock::TimePoint::max()
+ ? ""
+ : fmt::format("{} until next scheduled GC.", NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTime).count()))));
+
+ if (!DiskSpaceGCTriggered && !TimeBasedGCTriggered)
+ {
+ WaitTime = m_Config.MonitorInterval < RemaingTime ? m_Config.MonitorInterval : RemaingTime;
+ continue;
+ }
+
+ WaitTime = m_Config.MonitorInterval;
+ uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+ if (!m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
+ {
+ continue;
+ }
+ }
+
+ CollectGarbage(ExpireTime, Delete, CollectSmallObjects);
+
+ uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
+ {
+ ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped));
+ break;
+ }
+
+ WaitTime = m_Config.MonitorInterval;
+ }
+}
+
+GcClock::TimePoint
+GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime)
+{
+ if (m_Config.Interval.count())
+ {
+ return CurrentTime + m_Config.Interval;
+ }
+ else
+ {
+ return GcClock::TimePoint::max();
+ }
+}
+
+void
+GcScheduler::CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects)
+{
+ GcContext GcCtx(ExpireTime);
+ GcCtx.SetDeletionMode(Delete);
+ GcCtx.CollectSmallObjects(CollectSmallObjects);
+ // GcCtx.MaxCacheDuration(MaxCacheDuration);
+ GcCtx.DiskReservePath(m_Config.RootDirectory / "reserve.gc");
+
+ ZEN_INFO("garbage collection STARTING, small objects gc {}, cutoff time {}",
+ GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv,
+ ExpireTime);
+ {
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ m_GcManager.CollectGarbage(GcCtx);
+
+ if (Delete)
+ {
+ m_LastGcExpireTime = ExpireTime;
+ std::unique_lock Lock(m_GcMutex);
+ m_DiskUsageWindow.KeepRange(ExpireTime.time_since_epoch().count(), GcClock::Duration::max().count());
+ }
+
+ m_LastGcTime = GcClock::Now();
+ m_NextGcTime = NextGcTime(m_LastGcTime);
+
+ {
+ const fs::path Path = m_Config.RootDirectory / "gc_state";
+ ZEN_DEBUG("saving scheduler state to '{}'", Path);
+ CbObjectWriter SchedulerState;
+ SchedulerState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count());
+ SchedulerState << "LastGcExpireTime"sv << static_cast<int64_t>(m_LastGcExpireTime.time_since_epoch().count());
+ SaveCompactBinaryObject(Path, SchedulerState.Save());
+ }
+
+ std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
+ if (Ec)
+ {
+ ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason: '{}'",
+ m_Config.RootDirectory / "reserve.gc",
+ NiceBytes(m_Config.DiskReserveSize),
+ Ec.message());
+ }
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+namespace gc::impl {
+ static IoBuffer CreateChunk(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
+ }
+
+ static CompressedBuffer Compress(IoBuffer Buffer)
+ {
+ return CompressedBuffer::Compress(SharedBuffer::MakeView(Buffer.GetData(), Buffer.GetSize()));
+ }
+} // namespace gc::impl
+
+TEST_CASE("gc.basic")
+{
+ using namespace gc::impl;
+
+ ScopedTemporaryDirectory TempDir;
+
+ CidStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path() / "cas";
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+
+ CidStore.Initialize(CasConfig);
+
+ IoBuffer Chunk = CreateChunk(128);
+ auto CompressedChunk = Compress(Chunk);
+
+ const auto InsertResult = CidStore.AddChunk(CompressedChunk.GetCompressed().Flatten().AsIoBuffer(), CompressedChunk.DecodeRawHash());
+ CHECK(InsertResult.New);
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+
+ CidStore.Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CidStore.ContainsChunk(CompressedChunk.DecodeRawHash()));
+}
+
+TEST_CASE("gc.full")
+{
+ using namespace gc::impl;
+
+ ScopedTemporaryDirectory TempDir;
+
+ CidStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path() / "cas";
+
+ GcManager Gc;
+ std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc);
+
+ CasStore->Initialize(CasConfig);
+
+ uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
+ IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]),
+ CreateChunk(ChunkSizes[1]),
+ CreateChunk(ChunkSizes[2]),
+ CreateChunk(ChunkSizes[3]),
+ CreateChunk(ChunkSizes[4]),
+ CreateChunk(ChunkSizes[5]),
+ CreateChunk(ChunkSizes[6]),
+ CreateChunk(ChunkSizes[7]),
+ CreateChunk(ChunkSizes[8])};
+ IoHash ChunkHashes[9] = {
+ IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()),
+ IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()),
+ IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()),
+ IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()),
+ IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()),
+ IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()),
+ IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()),
+ IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()),
+ IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()),
+ };
+
+ CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
+ CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
+ CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
+ CasStore->InsertChunk(Chunks[8], ChunkHashes[8]);
+
+ CidStoreSize InitialSize = CasStore->TotalSize();
+
+ // Keep first and last
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ CasStore->Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(CasStore->ContainsChunk(ChunkHashes[0]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[7]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
+ }
+
+ CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
+ CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
+
+ // Keep last
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ CasStore->Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[7]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
+
+ CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
+ CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Keep mixed
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[1]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ CasStore->Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[1]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[4]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[7]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
+
+ CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
+ CasStore->InsertChunk(Chunks[8], ChunkHashes[8]);
+ }
+
+ // Keep multiple at end
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.AddRetainedCids(KeepChunks);
+
+ CasStore->Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[6]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[7]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
+
+ CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
+ CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ }
+
+ // Verify that we nicely appended blocks even after all GC operations
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[3] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[3])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[5])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
+
+ auto FinalSize = CasStore->TotalSize();
+
+ CHECK_LE(InitialSize.TinySize, FinalSize.TinySize);
+ CHECK_GE(InitialSize.TinySize + (1u << 28), FinalSize.TinySize);
+}
+
+TEST_CASE("gc.diskusagewindow")
+{
+ using namespace gc::impl;
+
+ DiskUsageWindow Stats;
+ Stats.Append({.SampleTime = 0, .DiskUsage = 0}); // 0 0
+ Stats.Append({.SampleTime = 10, .DiskUsage = 10}); // 1 10
+ Stats.Append({.SampleTime = 20, .DiskUsage = 20}); // 2 10
+ Stats.Append({.SampleTime = 30, .DiskUsage = 20}); // 3 0
+ Stats.Append({.SampleTime = 40, .DiskUsage = 15}); // 4 0
+ Stats.Append({.SampleTime = 50, .DiskUsage = 25}); // 5 10
+ Stats.Append({.SampleTime = 60, .DiskUsage = 30}); // 6 5
+ Stats.Append({.SampleTime = 70, .DiskUsage = 45}); // 7 15
+
+ SUBCASE("Truncate start")
+ {
+ Stats.KeepRange(-15, 31);
+ CHECK(Stats.m_LogWindow.size() == 4);
+ CHECK(Stats.m_LogWindow[0].SampleTime == 0);
+ CHECK(Stats.m_LogWindow[3].SampleTime == 30);
+ }
+
+ SUBCASE("Truncate end")
+ {
+ Stats.KeepRange(70, 71);
+ CHECK(Stats.m_LogWindow.size() == 1);
+ CHECK(Stats.m_LogWindow[0].SampleTime == 70);
+ }
+
+ SUBCASE("Truncate middle")
+ {
+ Stats.KeepRange(29, 69);
+ CHECK(Stats.m_LogWindow.size() == 4);
+ CHECK(Stats.m_LogWindow[0].SampleTime == 30);
+ CHECK(Stats.m_LogWindow[3].SampleTime == 60);
+ }
+
+ SUBCASE("Full range")
+ {
+ uint64_t MaxDelta = 0;
+ // 0-10, 10-20, 20-30, 30-40, 40-50, 50-60, 60-70, 70-80
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(0, 80, 10, MaxDelta);
+ CHECK(DiskDeltas.size() == 8);
+ CHECK(MaxDelta == 15);
+ CHECK(DiskDeltas[0] == 0);
+ CHECK(DiskDeltas[1] == 10);
+ CHECK(DiskDeltas[2] == 10);
+ CHECK(DiskDeltas[3] == 0);
+ CHECK(DiskDeltas[4] == 0);
+ CHECK(DiskDeltas[5] == 10);
+ CHECK(DiskDeltas[6] == 5);
+ CHECK(DiskDeltas[7] == 15);
+ }
+
+ SUBCASE("Sub range")
+ {
+ uint64_t MaxDelta = 0;
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(20, 40, 10, MaxDelta);
+ CHECK(DiskDeltas.size() == 2);
+ CHECK(MaxDelta == 10);
+ CHECK(DiskDeltas[0] == 10); // [20:30]
+ CHECK(DiskDeltas[1] == 0); // [30:40]
+ }
+ SUBCASE("Unaligned sub range 1")
+ {
+ uint64_t MaxDelta = 0;
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(21, 51, 10, MaxDelta);
+ CHECK(DiskDeltas.size() == 3);
+ CHECK(MaxDelta == 10);
+ CHECK(DiskDeltas[0] == 0); // [21:31]
+ CHECK(DiskDeltas[1] == 0); // [31:41]
+ CHECK(DiskDeltas[2] == 10); // [41:51]
+ }
+ SUBCASE("Unaligned end range")
+ {
+ uint64_t MaxDelta = 0;
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(29, 79, 10, MaxDelta);
+ CHECK(DiskDeltas.size() == 5);
+ CHECK(MaxDelta == 15);
+ CHECK(DiskDeltas[0] == 0); // [29:39]
+ CHECK(DiskDeltas[1] == 0); // [39:49]
+ CHECK(DiskDeltas[2] == 10); // [49:59]
+ CHECK(DiskDeltas[3] == 5); // [59:69]
+ CHECK(DiskDeltas[4] == 15); // [69:79]
+ }
+ SUBCASE("Ahead of window")
+ {
+ uint64_t MaxDelta = 0;
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(-40, 0, 10, MaxDelta);
+ CHECK(DiskDeltas.size() == 4);
+ CHECK(MaxDelta == 0);
+ CHECK(DiskDeltas[0] == 0); // [-40:-30]
+ CHECK(DiskDeltas[1] == 0); // [-30:-20]
+ CHECK(DiskDeltas[2] == 0); // [-20:-10]
+ CHECK(DiskDeltas[3] == 0); // [-10:0]
+ }
+ SUBCASE("After of window")
+ {
+ uint64_t MaxDelta = 0;
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(90, 120, 10, MaxDelta);
+ CHECK(DiskDeltas.size() == 3);
+ CHECK(MaxDelta == 0);
+ CHECK(DiskDeltas[0] == 0); // [90:100]
+ CHECK(DiskDeltas[1] == 0); // [100:110]
+ CHECK(DiskDeltas[2] == 0); // [110:120]
+ }
+ SUBCASE("Encapsulating window")
+ {
+ uint64_t MaxDelta = 0;
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(-20, 100, 10, MaxDelta);
+ CHECK(DiskDeltas.size() == 12);
+ CHECK(MaxDelta == 15);
+ CHECK(DiskDeltas[0] == 0); // [-20:-10]
+ CHECK(DiskDeltas[1] == 0); // [ -10:0]
+ CHECK(DiskDeltas[2] == 0); // [0:10]
+ CHECK(DiskDeltas[3] == 10); // [10:20]
+ CHECK(DiskDeltas[4] == 10); // [20:30]
+ CHECK(DiskDeltas[5] == 0); // [30:40]
+ CHECK(DiskDeltas[6] == 0); // [40:50]
+ CHECK(DiskDeltas[7] == 10); // [50:60]
+ CHECK(DiskDeltas[8] == 5); // [60:70]
+ CHECK(DiskDeltas[9] == 15); // [70:80]
+ CHECK(DiskDeltas[10] == 0); // [80:90]
+ CHECK(DiskDeltas[11] == 0); // [90:100]
+ }
+
+ SUBCASE("Full range half stride")
+ {
+ uint64_t MaxDelta = 0;
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(0, 80, 20, MaxDelta);
+ CHECK(DiskDeltas.size() == 4);
+ CHECK(MaxDelta == 20);
+ CHECK(DiskDeltas[0] == 10); // [0:20]
+ CHECK(DiskDeltas[1] == 10); // [20:40]
+ CHECK(DiskDeltas[2] == 10); // [40:60]
+ CHECK(DiskDeltas[3] == 20); // [60:80]
+ }
+
+ SUBCASE("Partial odd stride")
+ {
+ uint64_t MaxDelta = 0;
+ std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(13, 67, 18, MaxDelta);
+ CHECK(DiskDeltas.size() == 3);
+ CHECK(MaxDelta == 15);
+ CHECK(DiskDeltas[0] == 10); // [13:31]
+ CHECK(DiskDeltas[1] == 0); // [31:49]
+ CHECK(DiskDeltas[2] == 15); // [49:67]
+ }
+
+ SUBCASE("Find size window")
+ {
+ DiskUsageWindow Empty;
+ CHECK(Empty.FindTimepointThatRemoves(15u, 10000) == 10000);
+
+ CHECK(Stats.FindTimepointThatRemoves(15u, 40) == 21);
+ CHECK(Stats.FindTimepointThatRemoves(15u, 20) == 20);
+ CHECK(Stats.FindTimepointThatRemoves(100000u, 50) == 50);
+ CHECK(Stats.FindTimepointThatRemoves(100000u, 1000));
+ }
+}
+#endif
+
+void
+gc_forcelink()
+{
+}
+
+} // namespace zen
diff --git a/src/zenstore/hashkeyset.cpp b/src/zenstore/hashkeyset.cpp
new file mode 100644
index 000000000..a5436f5cb
--- /dev/null
+++ b/src/zenstore/hashkeyset.cpp
@@ -0,0 +1,60 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/hashkeyset.h>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+void
+HashKeySet::AddHashToSet(const IoHash& HashToAdd)
+{
+ m_HashSet.insert(HashToAdd);
+}
+
+void
+HashKeySet::AddHashesToSet(std::span<const IoHash> HashesToAdd)
+{
+ m_HashSet.insert(HashesToAdd.begin(), HashesToAdd.end());
+}
+
+void
+HashKeySet::RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
+{
+ for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd;)
+ {
+ if (Predicate(*It))
+ {
+ It = m_HashSet.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+HashKeySet::IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const
+{
+ for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd; ++It)
+ {
+ Callback(*It);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Testing related code follows...
+//
+
+#if ZEN_WITH_TESTS
+
+void
+hashkeyset_forcelink()
+{
+}
+
+#endif
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
new file mode 100644
index 000000000..857ccae38
--- /dev/null
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -0,0 +1,175 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/filesystem.h>
+#include <zencore/zencore.h>
+#include <zenutil/basicfile.h>
+
+#include <unordered_map>
+#include <unordered_set>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct BlockStoreLocation
+{
+ uint32_t BlockIndex;
+ uint64_t Offset;
+ uint64_t Size;
+
+ inline auto operator<=>(const BlockStoreLocation& Rhs) const = default;
+};
+
+#pragma pack(push)
+#pragma pack(1)
+
+struct BlockStoreDiskLocation
+{
+ constexpr static uint32_t MaxBlockIndexBits = 20;
+ constexpr static uint32_t MaxOffsetBits = 28;
+ constexpr static uint32_t MaxBlockIndex = (1ul << BlockStoreDiskLocation::MaxBlockIndexBits) - 1ul;
+ constexpr static uint32_t MaxOffset = (1ul << BlockStoreDiskLocation::MaxOffsetBits) - 1ul;
+
+ BlockStoreDiskLocation(const BlockStoreLocation& Location, uint64_t OffsetAlignment)
+ {
+ Init(Location.BlockIndex, Location.Offset / OffsetAlignment, Location.Size);
+ }
+
+ BlockStoreDiskLocation() = default;
+
+ inline BlockStoreLocation Get(uint64_t OffsetAlignment) const
+ {
+ uint64_t PackedOffset = 0;
+ memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
+ return {.BlockIndex = static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits),
+ .Offset = (PackedOffset & MaxOffset) * OffsetAlignment,
+ .Size = GetSize()};
+ }
+
+ inline uint32_t GetBlockIndex() const
+ {
+ uint64_t PackedOffset = 0;
+ memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
+ return static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits);
+ }
+
+ inline uint64_t GetOffset(uint64_t OffsetAlignment) const
+ {
+ uint64_t PackedOffset = 0;
+ memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
+ return (PackedOffset & MaxOffset) * OffsetAlignment;
+ }
+
+ inline uint64_t GetSize() const { return m_Size; }
+
+ inline auto operator<=>(const BlockStoreDiskLocation& Rhs) const = default;
+
+private:
+ inline void Init(uint32_t BlockIndex, uint64_t Offset, uint64_t Size)
+ {
+ ZEN_ASSERT(BlockIndex <= MaxBlockIndex);
+ ZEN_ASSERT(Offset <= MaxOffset);
+ ZEN_ASSERT(Size <= std::numeric_limits<std::uint32_t>::max());
+
+ m_Size = static_cast<uint32_t>(Size);
+ uint64_t PackedOffset = (static_cast<uint64_t>(BlockIndex) << MaxOffsetBits) + Offset;
+ memcpy(&m_Offset[0], &PackedOffset, sizeof m_Offset);
+ }
+
+ uint32_t m_Size;
+ uint8_t m_Offset[6];
+};
+
+#pragma pack(pop)
+
+struct BlockStoreFile : public RefCounted
+{
+ explicit BlockStoreFile(const std::filesystem::path& BlockPath);
+ ~BlockStoreFile();
+ const std::filesystem::path& GetPath() const;
+ void Open();
+ void Create(uint64_t InitialSize);
+ void MarkAsDeleteOnClose();
+ uint64_t FileSize();
+ IoBuffer GetChunk(uint64_t Offset, uint64_t Size);
+ void Read(void* Data, uint64_t Size, uint64_t FileOffset);
+ void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
+ void Flush();
+ BasicFile& GetBasicFile();
+ void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
+
+private:
+ const std::filesystem::path m_Path;
+ IoBuffer m_IoBuffer;
+ BasicFile m_File;
+};
+
+class BlockStore
+{
+public:
+ struct ReclaimSnapshotState
+ {
+ std::unordered_set<uint32_t> m_ActiveWriteBlocks;
+ size_t BlockCount;
+ };
+
+ typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray;
+ typedef std::vector<size_t> ChunkIndexArray;
+
+ typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback;
+ typedef std::function<uint64_t()> ClaimDiskReserveCallback;
+ typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
+ typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
+ typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
+
+ void Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations);
+ void Close();
+
+ void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback);
+
+ IoBuffer TryGetChunk(const BlockStoreLocation& Location) const;
+ void Flush();
+
+ ReclaimSnapshotState GetReclaimSnapshotState();
+ void ReclaimSpace(
+ const ReclaimSnapshotState& Snapshot,
+ const std::vector<BlockStoreLocation>& ChunkLocations,
+ const ChunkIndexArray& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& ChangeCallback = [](const MovedChunksArray&, const ChunkIndexArray&) {},
+ const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; });
+
+ void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const IterateChunksSmallSizeCallback& SmallSizeCallback,
+ const IterateChunksLargeSizeCallback& LargeSizeCallback);
+
+ static const char* GetBlockFileExtension();
+ static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);
+
+ inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
+
+private:
+ std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
+
+ mutable RwLock m_InsertLock; // used to serialize inserts
+ Ref<BlockStoreFile> m_WriteBlock;
+ std::uint64_t m_CurrentInsertOffset = 0;
+ std::atomic_uint32_t m_WriteBlockIndex{};
+ std::vector<uint32_t> m_ActiveWriteBlocks;
+
+ uint64_t m_MaxBlockSize = 1u << 28;
+ uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1;
+ std::filesystem::path m_BlocksBasePath;
+
+ std::atomic_uint64_t m_TotalSize{};
+};
+
+void blockstore_forcelink();
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/caslog.h b/src/zenstore/include/zenstore/caslog.h
new file mode 100644
index 000000000..d8c3f22f3
--- /dev/null
+++ b/src/zenstore/include/zenstore/caslog.h
@@ -0,0 +1,91 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/uid.h>
+#include <zenutil/basicfile.h>
+
+namespace zen {
+
+class CasLogFile
+{
+public:
+ CasLogFile();
+ ~CasLogFile();
+
+ enum class Mode
+ {
+ kRead,
+ kWrite,
+ kTruncate
+ };
+
+ static bool IsValid(std::filesystem::path FileName, size_t RecordSize);
+ void Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode);
+ void Append(const void* DataPointer, uint64_t DataSize);
+ void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount);
+ void Flush();
+ void Close();
+ uint64_t GetLogSize();
+ uint64_t GetLogCount();
+
+private:
+ struct FileHeader
+ {
+ uint8_t Magic[16];
+ uint32_t RecordSize = 0;
+ Oid LogId;
+ uint32_t ValidatedTail = 0;
+ uint32_t Pad[6];
+ uint32_t Checksum = 0;
+
+ static const inline uint8_t MagicSequence[16] = {'.', '-', '=', ' ', 'C', 'A', 'S', 'L', 'O', 'G', 'v', '1', ' ', '=', '-', '.'};
+
+ ZENCORE_API uint32_t ComputeChecksum();
+ void Finalize() { Checksum = ComputeChecksum(); }
+ };
+
+ static_assert(sizeof(FileHeader) == 64);
+
+private:
+ void Open(std::filesystem::path FileName, size_t RecordSize, BasicFile::Mode Mode);
+
+ BasicFile m_File;
+ FileHeader m_Header;
+ size_t m_RecordSize = 1;
+ std::atomic<uint64_t> m_AppendOffset = 0;
+};
+
+template<typename T>
+class TCasLogFile : public CasLogFile
+{
+public:
+ static bool IsValid(std::filesystem::path FileName) { return CasLogFile::IsValid(FileName, sizeof(T)); }
+ void Open(std::filesystem::path FileName, Mode Mode) { CasLogFile::Open(FileName, sizeof(T), Mode); }
+
+ // This should be called before the Replay() is called to do some basic sanity checking
+ bool Initialize() { return true; }
+
+ void Replay(Invocable<const T&> auto Handler, uint64_t SkipEntryCount)
+ {
+ CasLogFile::Replay(
+ [&](const void* VoidPtr) {
+ const T& Record = *reinterpret_cast<const T*>(VoidPtr);
+
+ Handler(Record);
+ },
+ SkipEntryCount);
+ }
+
+ void Append(const T& Record)
+ {
+ // TODO: implement some more efficent path here so we don't end up with
+ // a syscall per append
+
+ CasLogFile::Append(&Record, sizeof Record);
+ }
+
+ void Append(const std::span<T>& Records) { CasLogFile::Append(Records.data(), sizeof(T) * Records.size()); }
+};
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
new file mode 100644
index 000000000..16ca78225
--- /dev/null
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -0,0 +1,87 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenstore.h"
+
+#include <zencore/iohash.h>
+#include <zenstore/hashkeyset.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <filesystem>
+
+namespace zen {
+
+class GcManager;
+class CasStore;
+class CompressedBuffer;
+class IoBuffer;
+class ScrubContext;
+
+/** Content Store
+ *
+ * Data in the content store is referenced by content identifiers (CIDs), it works
+ * with compressed buffers so the CID is expected to be the RAW hash. It stores the
+ * chunk directly under the RAW hash.
+ * This class maps uncompressed hashes (CIDs) to compressed hashes and may
+ * be used to deal with other kinds of indirections in the future. For example, if we want
+ * to support chunking then a CID may represent a list of chunks which could be concatenated
+ * to form the referenced chunk.
+ *
+ */
+
+struct CidStoreSize
+{
+ uint64_t TinySize = 0;
+ uint64_t SmallSize = 0;
+ uint64_t LargeSize = 0;
+ uint64_t TotalSize = 0;
+};
+
+struct CidStoreConfiguration
+{
+ // Root directory for CAS store
+ std::filesystem::path RootDirectory;
+
+ // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy
+ uint64_t TinyValueThreshold = 1024;
+
+ // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy
+ uint64_t HugeValueThreshold = 1024 * 1024;
+};
+
+class CidStore
+{
+public:
+ CidStore(GcManager& Gc);
+ ~CidStore();
+
+ struct InsertResult
+ {
+ bool New = false;
+ };
+ enum class InsertMode
+ {
+ kCopyOnly,
+ kMayBeMovedInPlace
+ };
+
+ void Initialize(const CidStoreConfiguration& Config);
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId);
+ bool ContainsChunk(const IoHash& DecompressedId);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ CidStoreSize TotalSize() const;
+
+private:
+ struct Impl;
+ std::unique_ptr<CasStore> m_CasStore;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
new file mode 100644
index 000000000..e0354b331
--- /dev/null
+++ b/src/zenstore/include/zenstore/gc.h
@@ -0,0 +1,242 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+#include <zencore/thread.h>
+#include <zenstore/caslog.h>
+
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <filesystem>
+#include <functional>
+#include <optional>
+#include <span>
+#include <thread>
+
+#define ZEN_USE_REF_TRACKING 0 // This is not currently functional
+
+namespace spdlog {
+class logger;
+}
+
+namespace zen {
+
+class HashKeySet;
+class GcManager;
+class CidStore;
+struct IoHash;
+
+/** GC clock
+ */
+class GcClock
+{
+public:
+ using Clock = std::chrono::system_clock;
+ using TimePoint = Clock::time_point;
+ using Duration = Clock::duration;
+ using Tick = int64_t;
+
+ static Tick TickCount() { return Now().time_since_epoch().count(); }
+ static TimePoint Now() { return Clock::now(); }
+ static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; }
+};
+
+/** Garbage Collection context object
+ */
+class GcContext
+{
+public:
+ GcContext(const GcClock::TimePoint& ExpireTime);
+ ~GcContext();
+
+ void AddRetainedCids(std::span<const IoHash> Cid);
+ void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys);
+
+ void IterateCids(std::function<void(const IoHash&)> Callback);
+
+ void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc);
+ void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc);
+
+ void AddDeletedCids(std::span<const IoHash> Cas);
+ const HashKeySet& DeletedCids();
+
+ std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const;
+
+ bool IsDeletionMode() const;
+ void SetDeletionMode(bool NewState);
+
+ bool CollectSmallObjects() const;
+ void CollectSmallObjects(bool NewState);
+
+ GcClock::TimePoint ExpireTime() const;
+
+ void DiskReservePath(const std::filesystem::path& Path);
+ uint64_t ClaimGCReserve();
+
+private:
+ struct GcState;
+
+ std::unique_ptr<GcState> m_State;
+};
+
+/** GC root contributor
+
+ Higher level data structures provide roots for the garbage collector,
+ which ultimately determine what is garbage and what data we need to
+ retain.
+
+ */
+class GcContributor
+{
+public:
+ GcContributor(GcManager& Gc);
+ ~GcContributor();
+
+ virtual void GatherReferences(GcContext& GcCtx) = 0;
+
+protected:
+ GcManager& m_Gc;
+};
+
+struct GcStorageSize
+{
+ uint64_t DiskSize{};
+ uint64_t MemorySize{};
+};
+
+/** GC storage provider
+ */
+class GcStorage
+{
+public:
+ GcStorage(GcManager& Gc);
+ ~GcStorage();
+
+ virtual void CollectGarbage(GcContext& GcCtx) = 0;
+ virtual GcStorageSize StorageSize() const = 0;
+
+private:
+ GcManager& m_Gc;
+};
+
+/** GC orchestrator
+ */
+class GcManager
+{
+public:
+ GcManager();
+ ~GcManager();
+
+ void AddGcContributor(GcContributor* Contributor);
+ void RemoveGcContributor(GcContributor* Contributor);
+
+ void AddGcStorage(GcStorage* Contributor);
+ void RemoveGcStorage(GcStorage* Contributor);
+
+ void CollectGarbage(GcContext& GcCtx);
+
+ GcStorageSize TotalStorageSize() const;
+
+#if ZEN_USE_REF_TRACKING
+ void OnNewCidReferences(std::span<IoHash> Hashes);
+ void OnCommittedCidReferences(std::span<IoHash> Hashes);
+ void OnDroppedCidReferences(std::span<IoHash> Hashes);
+#endif
+
+private:
+ spdlog::logger& Log() { return m_Log; }
+ spdlog::logger& m_Log;
+ mutable RwLock m_Lock;
+ std::vector<GcContributor*> m_GcContribs;
+ std::vector<GcStorage*> m_GcStorage;
+ CidStore* m_CidStore = nullptr;
+};
+
+enum class GcSchedulerStatus : uint32_t
+{
+ kIdle,
+ kRunning,
+ kStopped
+};
+
+struct GcSchedulerConfig
+{
+ std::filesystem::path RootDirectory;
+ std::chrono::seconds MonitorInterval{30};
+ std::chrono::seconds Interval{};
+ std::chrono::seconds MaxCacheDuration{86400};
+ bool CollectSmallObjects = true;
+ bool Enabled = true;
+ uint64_t DiskReserveSize = 1ul << 28;
+ uint64_t DiskSizeSoftLimit = 0;
+};
+
+class DiskUsageWindow
+{
+public:
+ struct DiskUsageEntry
+ {
+ GcClock::Tick SampleTime;
+ uint64_t DiskUsage;
+ };
+
+ std::vector<DiskUsageEntry> m_LogWindow;
+ inline void Append(const DiskUsageEntry& Entry) { m_LogWindow.push_back(Entry); }
+ inline void Append(DiskUsageEntry&& Entry) { m_LogWindow.emplace_back(std::move(Entry)); }
+ void KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick);
+ std::vector<uint64_t> GetDiskDeltas(GcClock::Tick StartTick,
+ GcClock::Tick EndTick,
+ GcClock::Tick DeltaWidth,
+ uint64_t& OutMaxDelta) const;
+ GcClock::Tick FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const;
+};
+
+/**
+ * GC scheduler
+ */
+class GcScheduler
+{
+public:
+ GcScheduler(GcManager& GcManager);
+ ~GcScheduler();
+
+ void Initialize(const GcSchedulerConfig& Config);
+ void Shutdown();
+ GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
+
+ struct TriggerParams
+ {
+ bool CollectSmallObjects = false;
+ std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max();
+ uint64_t DiskSizeSoftLimit = 0;
+ };
+
+ bool Trigger(const TriggerParams& Params);
+
+private:
+ void SchedulerThread();
+ void CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects);
+ GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime);
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
+ GcManager& m_GcManager;
+ GcSchedulerConfig m_Config;
+ GcClock::TimePoint m_LastGcTime{};
+ GcClock::TimePoint m_LastGcExpireTime{};
+ GcClock::TimePoint m_NextGcTime{};
+ std::atomic_uint32_t m_Status{};
+ std::thread m_GcThread;
+ std::mutex m_GcMutex;
+ std::condition_variable m_GcSignal;
+ std::optional<TriggerParams> m_TriggerParams;
+
+ TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
+ DiskUsageWindow m_DiskUsageWindow;
+};
+
+void gc_forcelink();
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/hashkeyset.h b/src/zenstore/include/zenstore/hashkeyset.h
new file mode 100644
index 000000000..411a6256e
--- /dev/null
+++ b/src/zenstore/include/zenstore/hashkeyset.h
@@ -0,0 +1,54 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenstore.h"
+
+#include <zencore/iohash.h>
+
+#include <functional>
+#include <unordered_set>
+
+namespace zen {
+
+/** Manage a set of IoHash values
+ */
+
+class HashKeySet
+{
+public:
+ void AddHashToSet(const IoHash& HashToAdd);
+ void AddHashesToSet(std::span<const IoHash> HashesToAdd);
+ void RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
+ void IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const;
+ [[nodiscard]] inline bool ContainsHash(const IoHash& Hash) const { return m_HashSet.find(Hash) != m_HashSet.end(); }
+ [[nodiscard]] inline bool IsEmpty() const { return m_HashSet.empty(); }
+ [[nodiscard]] inline size_t GetSize() const { return m_HashSet.size(); }
+
+ inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) const
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ if (ContainsHash(Candidate))
+ {
+ MatchFunc(Candidate);
+ }
+ }
+ }
+
+ inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) const
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ MatchFunc(Candidate, ContainsHash(Candidate));
+ }
+ }
+
+private:
+ // Q: should we protect this with a lock, or is that a higher level concern?
+ std::unordered_set<IoHash, IoHash::Hasher> m_HashSet;
+};
+
+void hashkeyset_forcelink();
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h
new file mode 100644
index 000000000..0b884fcc6
--- /dev/null
+++ b/src/zenstore/include/zenstore/scrubcontext.h
@@ -0,0 +1,41 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/timer.h>
+#include <zenstore/hashkeyset.h>
+
+namespace zen {
+
+/** Context object for data scrubbing
+ *
+ * Data scrubbing is when we traverse stored data to validate it and
+ * optionally correct/recover
+ */
+
+class ScrubContext
+{
+public:
+ virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); }
+ inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
+ inline bool RunRecovery() const { return m_Recover; }
+ void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
+ {
+ m_ChunkCount.fetch_add(ChunkCount);
+ m_ByteCount.fetch_add(ChunkBytes);
+ }
+
+ inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
+ inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
+
+ const HashKeySet BadCids() const { return m_BadCid; }
+
+private:
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
+ std::atomic<uint64_t> m_ChunkCount{0};
+ std::atomic<uint64_t> m_ByteCount{0};
+ HashKeySet m_BadCid;
+};
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h
new file mode 100644
index 000000000..46d62029d
--- /dev/null
+++ b/src/zenstore/include/zenstore/zenstore.h
@@ -0,0 +1,13 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#define ZENSTORE_API
+
+namespace zen {
+
+ZENSTORE_API void zenstore_forcelinktests();
+
+}
diff --git a/src/zenstore/xmake.lua b/src/zenstore/xmake.lua
new file mode 100644
index 000000000..4469c5650
--- /dev/null
+++ b/src/zenstore/xmake.lua
@@ -0,0 +1,9 @@
+-- Copyright Epic Games, Inc. All Rights Reserved.
+
+target('zenstore')
+ set_kind("static")
+ add_headerfiles("**.h")
+ add_files("**.cpp")
+ add_includedirs("include", {public=true})
+ add_deps("zencore", "zenutil")
+ add_packages("vcpkg::robin-map")
diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp
new file mode 100644
index 000000000..d87652fde
--- /dev/null
+++ b/src/zenstore/zenstore.cpp
@@ -0,0 +1,32 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenstore/zenstore.h"
+
+#if ZEN_WITH_TESTS
+
+# include <zenstore/blockstore.h>
+# include <zenstore/gc.h>
+# include <zenstore/hashkeyset.h>
+# include <zenutil/basicfile.h>
+
+# include "cas.h"
+# include "compactcas.h"
+# include "filecas.h"
+
+namespace zen {
+
+void
+zenstore_forcelinktests()
+{
+ basicfile_forcelink();
+ CAS_forcelink();
+ filecas_forcelink();
+ blockstore_forcelink();
+ compactcas_forcelink();
+ gc_forcelink();
+ hashkeyset_forcelink();
+}
+
+} // namespace zen
+
+#endif