aboutsummaryrefslogtreecommitdiff
path: root/zenstore
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /zenstore
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'zenstore')
-rw-r--r--zenstore/blockstore.cpp1312
-rw-r--r--zenstore/cas.cpp355
-rw-r--r--zenstore/cas.h67
-rw-r--r--zenstore/caslog.cpp236
-rw-r--r--zenstore/cidstore.cpp125
-rw-r--r--zenstore/compactcas.cpp1511
-rw-r--r--zenstore/compactcas.h95
-rw-r--r--zenstore/filecas.cpp1452
-rw-r--r--zenstore/filecas.h102
-rw-r--r--zenstore/gc.cpp1312
-rw-r--r--zenstore/hashkeyset.cpp60
-rw-r--r--zenstore/include/zenstore/blockstore.h175
-rw-r--r--zenstore/include/zenstore/caslog.h91
-rw-r--r--zenstore/include/zenstore/cidstore.h87
-rw-r--r--zenstore/include/zenstore/gc.h242
-rw-r--r--zenstore/include/zenstore/hashkeyset.h54
-rw-r--r--zenstore/include/zenstore/scrubcontext.h41
-rw-r--r--zenstore/include/zenstore/zenstore.h13
-rw-r--r--zenstore/xmake.lua9
-rw-r--r--zenstore/zenstore.cpp32
20 files changed, 0 insertions, 7371 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
deleted file mode 100644
index 5dfa10c91..000000000
--- a/zenstore/blockstore.cpp
+++ /dev/null
@@ -1,1312 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenstore/blockstore.h>
-
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/timer.h>
-
-#include <algorithm>
-
-#if ZEN_WITH_TESTS
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
-# include <random>
-#endif
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-//////////////////////////////////////////////////////////////////////////
-
-BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath)
-{
-}
-
-BlockStoreFile::~BlockStoreFile()
-{
- m_IoBuffer = IoBuffer();
- m_File.Detach();
-}
-
-const std::filesystem::path&
-BlockStoreFile::GetPath() const
-{
- return m_Path;
-}
-
-void
-BlockStoreFile::Open()
-{
- m_File.Open(m_Path, BasicFile::Mode::kDelete);
- void* FileHandle = m_File.Handle();
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize());
-}
-
-void
-BlockStoreFile::Create(uint64_t InitialSize)
-{
- auto ParentPath = m_Path.parent_path();
- if (!std::filesystem::is_directory(ParentPath))
- {
- CreateDirectories(ParentPath);
- }
-
- m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete);
- void* FileHandle = m_File.Handle();
-
- // We map our m_IoBuffer beyond the file size as we will grow it over time and want
- // to be able to create sub-buffers of all the written range later
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize);
-}
-
-uint64_t
-BlockStoreFile::FileSize()
-{
- return m_File.FileSize();
-}
-
-void
-BlockStoreFile::MarkAsDeleteOnClose()
-{
- m_IoBuffer.MarkAsDeleteOnClose();
-}
-
-IoBuffer
-BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size)
-{
- return IoBuffer(m_IoBuffer, Offset, Size);
-}
-
-void
-BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset)
-{
- m_File.Read(Data, Size, FileOffset);
-}
-
-void
-BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
-{
- m_File.Write(Data, Size, FileOffset);
-}
-
-void
-BlockStoreFile::Flush()
-{
- m_File.Flush();
-}
-
-BasicFile&
-BlockStoreFile::GetBasicFile()
-{
- return m_File;
-}
-
-void
-BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
-{
- m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
-}
-
-constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024;
-
-void
-BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
- uint64_t MaxBlockSize,
- uint64_t MaxBlockCount,
- const std::vector<BlockStoreLocation>& KnownLocations)
-{
- ZEN_ASSERT(MaxBlockSize > 0);
- ZEN_ASSERT(MaxBlockCount > 0);
- ZEN_ASSERT(IsPow2(MaxBlockCount));
-
- m_TotalSize = 0;
- m_BlocksBasePath = BlocksBasePath;
- m_MaxBlockSize = MaxBlockSize;
-
- m_ChunkBlocks.clear();
-
- std::unordered_set<uint32_t> KnownBlocks;
- for (const auto& Entry : KnownLocations)
- {
- KnownBlocks.insert(Entry.BlockIndex);
- }
-
- if (std::filesystem::is_directory(m_BlocksBasePath))
- {
- std::vector<std::filesystem::path> FoldersToScan;
- FoldersToScan.push_back(m_BlocksBasePath);
- size_t FolderOffset = 0;
- while (FolderOffset < FoldersToScan.size())
- {
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
- {
- if (Entry.is_directory())
- {
- FoldersToScan.push_back(Entry.path());
- continue;
- }
- if (Entry.is_regular_file())
- {
- const std::filesystem::path Path = Entry.path();
- if (Path.extension() != GetBlockFileExtension())
- {
- continue;
- }
- std::string FileName = PathToUtf8(Path.stem());
- uint32_t BlockIndex;
- bool OK = ParseHexNumber(FileName, BlockIndex);
- if (!OK)
- {
- continue;
- }
- if (!KnownBlocks.contains(BlockIndex))
- {
- // Log removing unreferenced block
- // Clear out unused blocks
- ZEN_DEBUG("removing unused block at '{}'", Path);
- std::error_code Ec;
- std::filesystem::remove(Path, Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
- }
- continue;
- }
- Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)};
- BlockFile->Open();
- m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed);
- m_ChunkBlocks[BlockIndex] = BlockFile;
- }
- }
- ++FolderOffset;
- }
- }
- else
- {
- CreateDirectories(m_BlocksBasePath);
- }
-}
-
-void
-BlockStore::Close()
-{
- RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
- m_WriteBlock = nullptr;
- m_CurrentInsertOffset = 0;
- m_WriteBlockIndex = 0;
-
- m_ChunkBlocks.clear();
- m_BlocksBasePath.clear();
-}
-
-void
-BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback)
-{
- ZEN_ASSERT(Data != nullptr);
- ZEN_ASSERT(Size > 0u);
- ZEN_ASSERT(Size <= m_MaxBlockSize);
- ZEN_ASSERT(Alignment > 0u);
-
- RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
-
- uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- bool IsWriting = !!m_WriteBlock;
- if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize)
- {
- if (m_WriteBlock)
- {
- m_WriteBlock = nullptr;
- }
-
- if (m_ChunkBlocks.size() == m_MaxBlockCount)
- {
- throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
- }
-
- WriteBlockIndex += IsWriting ? 1 : 0;
- while (m_ChunkBlocks.contains(WriteBlockIndex))
- {
- WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
- }
-
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
-
- Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
- NewBlockFile->Create(m_MaxBlockSize);
-
- m_ChunkBlocks[WriteBlockIndex] = NewBlockFile;
- m_WriteBlock = NewBlockFile;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
- m_CurrentInsertOffset = 0;
- }
- uint64_t InsertOffset = m_CurrentInsertOffset;
- m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
- uint64_t AlignedWriteSize = m_CurrentInsertOffset - InsertOffset;
- Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
- m_ActiveWriteBlocks.push_back(WriteBlockIndex);
- InsertLock.ReleaseNow();
-
- WriteBlock->Write(Data, Size, InsertOffset);
- m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed);
-
- Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size});
-
- {
- RwLock::ExclusiveLockScope _(m_InsertLock);
- m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
- }
-}
-
-BlockStore::ReclaimSnapshotState
-BlockStore::GetReclaimSnapshotState()
-{
- ReclaimSnapshotState State;
- RwLock::SharedLockScope _(m_InsertLock);
- for (uint32_t BlockIndex : m_ActiveWriteBlocks)
- {
- State.m_ActiveWriteBlocks.insert(BlockIndex);
- }
- if (m_WriteBlock)
- {
- State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex);
- }
- State.BlockCount = m_ChunkBlocks.size();
- return State;
-}
-
-IoBuffer
-BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
-{
- RwLock::SharedLockScope InsertLock(m_InsertLock);
- if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
- {
- if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
- {
- return Block->GetChunk(Location.Offset, Location.Size);
- }
- }
- return IoBuffer();
-}
-
-void
-BlockStore::Flush()
-{
- RwLock::ExclusiveLockScope _(m_InsertLock);
- if (m_CurrentInsertOffset > 0)
- {
- uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
- m_WriteBlock = nullptr;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
- m_CurrentInsertOffset = 0;
- }
-}
-
-void
-BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
- const std::vector<BlockStoreLocation>& ChunkLocations,
- const ChunkIndexArray& KeepChunkIndexes,
- uint64_t PayloadAlignment,
- bool DryRun,
- const ReclaimCallback& ChangeCallback,
- const ClaimDiskReserveCallback& DiskReserveCallback)
-{
- if (ChunkLocations.empty())
- {
- return;
- }
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = ChunkLocations.size();
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = 0;
- uint64_t NewTotalSize = 0;
-
- uint64_t MovedCount = 0;
- uint64_t DeletedCount = 0;
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG(
- "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
- "{} "
- "of {} "
- "chunks ({}).",
- m_BlocksBasePath,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedCount,
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize));
- });
-
- size_t BlockCount = Snapshot.BlockCount;
- if (BlockCount == 0)
- {
- ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath);
- return;
- }
-
- std::unordered_set<size_t> KeepChunkMap;
- KeepChunkMap.reserve(KeepChunkIndexes.size());
- for (size_t KeepChunkIndex : KeepChunkIndexes)
- {
- KeepChunkMap.insert(KeepChunkIndex);
- }
-
- std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
- std::vector<ChunkIndexArray> BlockKeepChunks;
- std::vector<ChunkIndexArray> BlockDeleteChunks;
-
- BlockIndexToChunkMapIndex.reserve(BlockCount);
- BlockKeepChunks.reserve(BlockCount);
- BlockDeleteChunks.reserve(BlockCount);
- size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
-
- size_t DeleteCount = 0;
- for (size_t Index = 0; Index < TotalChunkCount; ++Index)
- {
- const BlockStoreLocation& Location = ChunkLocations[Index];
- OldTotalSize += Location.Size;
- if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex))
- {
- continue;
- }
-
- auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
- size_t ChunkMapIndex = 0;
- if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
- {
- ChunkMapIndex = BlockKeepChunks.size();
- BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
- BlockKeepChunks.resize(ChunkMapIndex + 1);
- BlockKeepChunks.back().reserve(GuesstimateCountPerBlock);
- BlockDeleteChunks.resize(ChunkMapIndex + 1);
- BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock);
- }
- else
- {
- ChunkMapIndex = BlockIndexPtr->second;
- }
-
- if (KeepChunkMap.contains(Index))
- {
- ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex];
- IndexMap.push_back(Index);
- NewTotalSize += Location.Size;
- continue;
- }
- ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex];
- IndexMap.push_back(Index);
- DeleteCount++;
- }
-
- std::unordered_set<uint32_t> BlocksToReWrite;
- BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
- for (const auto& Entry : BlockIndexToChunkMapIndex)
- {
- uint32_t BlockIndex = Entry.first;
- size_t ChunkMapIndex = Entry.second;
- const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex];
- if (ChunkMap.empty())
- {
- continue;
- }
- BlocksToReWrite.insert(BlockIndex);
- }
-
- if (DryRun)
- {
- ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}",
- m_BlocksBasePath,
- DeleteCount,
- NiceBytes(OldTotalSize - NewTotalSize),
- TotalChunkCount,
- OldTotalSize);
- return;
- }
-
- Ref<BlockStoreFile> NewBlockFile;
- try
- {
- uint64_t WriteOffset = 0;
- uint32_t NewBlockIndex = 0;
- for (uint32_t BlockIndex : BlocksToReWrite)
- {
- const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
-
- Ref<BlockStoreFile> OldBlockFile;
- {
- RwLock::SharedLockScope _i(m_InsertLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- OldBlockFile = m_ChunkBlocks[BlockIndex];
- }
-
- if (!OldBlockFile)
- {
- // If the block file pointed to does not exist, move them all to deleted list
- BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(),
- BlockKeepChunks[ChunkMapIndex].begin(),
- BlockKeepChunks[ChunkMapIndex].end());
- BlockKeepChunks[ChunkMapIndex].clear();
- }
-
- const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex];
- if (KeepMap.empty())
- {
- const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
- for (size_t DeleteIndex : DeleteMap)
- {
- DeletedSize += ChunkLocations[DeleteIndex].Size;
- }
- ChangeCallback({}, DeleteMap);
- DeletedCount += DeleteMap.size();
- {
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- if (OldBlockFile)
- {
- m_ChunkBlocks[BlockIndex] = nullptr;
- ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
- m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
- OldBlockFile->MarkAsDeleteOnClose();
- }
- }
- continue;
- }
-
- ZEN_ASSERT(OldBlockFile);
-
- MovedChunksArray MovedChunks;
- std::vector<uint8_t> Chunk;
- for (const size_t& ChunkIndex : KeepMap)
- {
- const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
- Chunk.resize(ChunkLocation.Size);
- OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
-
- if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
- {
- uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
-
- if (NewBlockFile)
- {
- NewBlockFile->Flush();
- NewBlockFile = nullptr;
- }
- {
- ChangeCallback(MovedChunks, {});
- MovedCount += KeepMap.size();
- MovedChunks.clear();
- RwLock::ExclusiveLockScope __(m_InsertLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- if (m_ChunkBlocks.size() == m_MaxBlockCount)
- {
- ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
- m_BlocksBasePath,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
- return;
- }
- while (m_ChunkBlocks.contains(NextBlockIndex))
- {
- NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1);
- }
- std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
- NewBlockFile = new BlockStoreFile(NewBlockPath);
- m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
- }
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
- return;
- }
- if (Space.Free < m_MaxBlockSize)
- {
- uint64_t ReclaimedSpace = DiskReserveCallback();
- if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
- {
- ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
- m_BlocksBasePath,
- m_MaxBlockSize,
- NiceBytes(Space.Free + ReclaimedSpace));
- RwLock::ExclusiveLockScope _l(m_InsertLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- m_ChunkBlocks.erase(NextBlockIndex);
- return;
- }
-
- ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
- m_BlocksBasePath,
- ReclaimedSpace,
- NiceBytes(Space.Free + ReclaimedSpace));
- }
- NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
- }
-
- NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
- MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}});
- uint64_t OldOffset = WriteOffset;
- WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
- m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed);
- }
- Chunk.clear();
- if (NewBlockFile)
- {
- NewBlockFile->Flush();
- NewBlockFile = nullptr;
- }
-
- const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
- for (size_t DeleteIndex : DeleteMap)
- {
- DeletedSize += ChunkLocations[DeleteIndex].Size;
- }
-
- ChangeCallback(MovedChunks, DeleteMap);
- MovedCount += KeepMap.size();
- DeletedCount += DeleteMap.size();
- MovedChunks.clear();
- {
- RwLock::ExclusiveLockScope __(m_InsertLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- m_ChunkBlocks[BlockIndex] = nullptr;
- ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
- m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
- OldBlockFile->MarkAsDeleteOnClose();
- }
- }
- }
- catch (std::exception& ex)
- {
- ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what());
- if (NewBlockFile)
- {
- ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
- m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed);
- NewBlockFile->MarkAsDeleteOnClose();
- }
- }
-}
-
-void
-BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
- const IterateChunksSmallSizeCallback& SmallSizeCallback,
- const IterateChunksLargeSizeCallback& LargeSizeCallback)
-{
- std::vector<size_t> LocationIndexes;
- LocationIndexes.reserve(ChunkLocations.size());
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
- {
- LocationIndexes.push_back(ChunkIndex);
- }
- std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
- const BlockStoreLocation& LocationA = ChunkLocations[IndexA];
- const BlockStoreLocation& LocationB = ChunkLocations[IndexB];
- if (LocationA.BlockIndex < LocationB.BlockIndex)
- {
- return true;
- }
- else if (LocationA.BlockIndex > LocationB.BlockIndex)
- {
- return false;
- }
- return LocationA.Offset < LocationB.Offset;
- });
-
- IoBuffer ReadBuffer{ScrubSmallChunkWindowSize};
- void* BufferBase = ReadBuffer.MutableData();
-
- RwLock::SharedLockScope _(m_InsertLock);
-
- auto GetNextRange = [&](size_t StartIndexOffset) {
- size_t ChunkCount = 0;
- size_t StartIndex = LocationIndexes[StartIndexOffset];
- const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
- uint64_t StartOffset = StartLocation.Offset;
- while (StartIndexOffset + ChunkCount < LocationIndexes.size())
- {
- size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount];
- const BlockStoreLocation& Location = ChunkLocations[NextIndex];
- if (Location.BlockIndex != StartLocation.BlockIndex)
- {
- break;
- }
- if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize)
- {
- break;
- }
- ++ChunkCount;
- }
- return ChunkCount;
- };
-
- size_t LocationIndexOffset = 0;
- while (LocationIndexOffset < LocationIndexes.size())
- {
- size_t ChunkIndex = LocationIndexes[LocationIndexOffset];
- const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
-
- const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex];
- if (!BlockFile)
- {
- while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex)
- {
- SmallSizeCallback(ChunkIndex, nullptr, 0);
- LocationIndexOffset++;
- if (LocationIndexOffset == LocationIndexes.size())
- {
- break;
- }
- ChunkIndex = LocationIndexes[LocationIndexOffset];
- }
- continue;
- }
- size_t BlockSize = BlockFile->FileSize();
- size_t RangeCount = GetNextRange(LocationIndexOffset);
- if (RangeCount > 0)
- {
- size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1];
- const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
- uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
- BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
- for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
- {
- size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex];
- const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
- if (ChunkLocation.Size == 0 || (ChunkLocation.Offset + ChunkLocation.Size > BlockSize))
- {
- SmallSizeCallback(NextChunkIndex, nullptr, 0);
- continue;
- }
- void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
- SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
- }
- LocationIndexOffset += RangeCount;
- continue;
- }
- if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
- {
- SmallSizeCallback(ChunkIndex, nullptr, 0);
- LocationIndexOffset++;
- continue;
- }
- LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size);
- LocationIndexOffset++;
- }
-}
-
-const char*
-BlockStore::GetBlockFileExtension()
-{
- return ".ucas";
-}
-
-std::filesystem::path
-BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
-{
- ExtendablePathBuilder<256> Path;
-
- char BlockHexString[9];
- ToHexNumber(BlockIndex, BlockHexString);
-
- Path.Append(BlocksBasePath);
- Path.AppendSeparator();
- Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
- Path.AppendSeparator();
- Path.Append(BlockHexString);
- Path.Append(GetBlockFileExtension());
- return Path.ToPath();
-}
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("blockstore.blockstoredisklocation")
-{
- BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0};
- CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4));
-
- BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0};
- CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4));
-
- BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0};
- CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4));
-
- BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()};
- CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4));
-
- BlockStoreLocation MaxBlockIndexAndOffset =
- BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0};
- CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4));
-
- BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex,
- .Offset = BlockStoreDiskLocation::MaxOffset * 4,
- .Size = std::numeric_limits<uint32_t>::max()};
- CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4));
-
- BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex,
- .Offset = BlockStoreDiskLocation::MaxOffset * 4096,
- .Size = std::numeric_limits<uint32_t>::max()};
- CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096));
-
- BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2,
- .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4,
- .Size = std::numeric_limits<uint32_t>::max() / 2};
- CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4));
-}
-
-TEST_CASE("blockstore.blockfile")
-{
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path() / "blocks";
- CreateDirectories(RootDirectory);
-
- {
- BlockStoreFile File1(RootDirectory / "1");
- File1.Create(16384);
- CHECK(File1.FileSize() == 0);
- File1.Write("data", 5, 0);
- IoBuffer DataChunk = File1.GetChunk(0, 5);
- File1.Write("boop", 5, 5);
- IoBuffer BoopChunk = File1.GetChunk(5, 5);
- const char* Data = static_cast<const char*>(DataChunk.GetData());
- CHECK(std::string(Data) == "data");
- const char* Boop = static_cast<const char*>(BoopChunk.GetData());
- CHECK(std::string(Boop) == "boop");
- File1.Flush();
- CHECK(File1.FileSize() == 10);
- }
- {
- BlockStoreFile File1(RootDirectory / "1");
- File1.Open();
-
- char DataRaw[5];
- File1.Read(DataRaw, 5, 0);
- CHECK(std::string(DataRaw) == "data");
- IoBuffer DataChunk = File1.GetChunk(0, 5);
-
- char BoopRaw[5];
- File1.Read(BoopRaw, 5, 5);
- CHECK(std::string(BoopRaw) == "boop");
-
- IoBuffer BoopChunk = File1.GetChunk(5, 5);
- const char* Data = static_cast<const char*>(DataChunk.GetData());
- CHECK(std::string(Data) == "data");
- const char* Boop = static_cast<const char*>(BoopChunk.GetData());
- CHECK(std::string(Boop) == "boop");
- }
-
- {
- IoBuffer DataChunk;
- IoBuffer BoopChunk;
-
- {
- BlockStoreFile File1(RootDirectory / "1");
- File1.Open();
- DataChunk = File1.GetChunk(0, 5);
- BoopChunk = File1.GetChunk(5, 5);
- }
-
- CHECK(std::filesystem::exists(RootDirectory / "1"));
-
- const char* Data = static_cast<const char*>(DataChunk.GetData());
- CHECK(std::string(Data) == "data");
- const char* Boop = static_cast<const char*>(BoopChunk.GetData());
- CHECK(std::string(Boop) == "boop");
- }
- CHECK(std::filesystem::exists(RootDirectory / "1"));
-
- {
- IoBuffer DataChunk;
- IoBuffer BoopChunk;
-
- {
- BlockStoreFile File1(RootDirectory / "1");
- File1.Open();
- File1.MarkAsDeleteOnClose();
- DataChunk = File1.GetChunk(0, 5);
- BoopChunk = File1.GetChunk(5, 5);
- }
-
- const char* Data = static_cast<const char*>(DataChunk.GetData());
- CHECK(std::string(Data) == "data");
- const char* Boop = static_cast<const char*>(BoopChunk.GetData());
- CHECK(std::string(Boop) == "boop");
- }
- CHECK(!std::filesystem::exists(RootDirectory / "1"));
-}
-
-namespace blockstore::impl {
- BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment)
- {
- BlockStoreLocation Location;
- Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; });
- CHECK(Location.Size == String.length());
- return Location;
- };
-
- std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location)
- {
- IoBuffer ChunkData = Store.TryGetChunk(Location);
- if (!ChunkData)
- {
- return "";
- }
- std::string AsString((const char*)ChunkData.Data(), ChunkData.Size());
- return AsString;
- };
-
- std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories)
- {
- DirectoryContent DirectoryContent;
- GetDirectoryContent(RootDir,
- DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) |
- (Directories ? DirectoryContent::IncludeDirsFlag : 0),
- DirectoryContent);
- std::vector<std::filesystem::path> Result;
- Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end());
- Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end());
- return Result;
- };
-
- static IoBuffer CreateChunk(uint64_t Size)
- {
- static std::random_device rd;
- static std::mt19937 g(rd());
-
- std::vector<uint8_t> Values;
- Values.resize(Size);
- for (size_t Idx = 0; Idx < Size; ++Idx)
- {
- Values[Idx] = static_cast<uint8_t>(Idx);
- }
- std::shuffle(Values.begin(), Values.end(), g);
-
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
- }
-} // namespace blockstore::impl
-
-TEST_CASE("blockstore.chunks")
-{
- using namespace blockstore::impl;
-
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path();
-
- BlockStore Store;
- Store.Initialize(RootDirectory, 128, 1024, {});
- IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
- CHECK(!BadChunk);
-
- std::string FirstChunkData = "This is the data of the first chunk that we will write";
- BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
- std::string SecondChunkData = "This is the data for the second chunk that we will write";
- BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
-
- CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
- CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
-
- std::string ThirdChunkData =
- "This is a much longer string that will not fit in the first block so it should be placed in the second block";
- BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4);
- CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex);
-
- CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
- CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
- CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData);
-}
-
-TEST_CASE("blockstore.clean.stray.blocks")
-{
- using namespace blockstore::impl;
-
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path();
-
- BlockStore Store;
- Store.Initialize(RootDirectory / "store", 128, 1024, {});
-
- std::string FirstChunkData = "This is the data of the first chunk that we will write";
- BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
- std::string SecondChunkData = "This is the data for the second chunk that we will write";
- BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
- std::string ThirdChunkData =
- "This is a much longer string that will not fit in the first block so it should be placed in the second block";
- WriteStringAsChunk(Store, ThirdChunkData, 4);
-
- Store.Close();
-
- // Not referencing the second block means that we should be deleted
- Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation});
-
- CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1);
-}
-
-TEST_CASE("blockstore.flush.forces.new.block")
-{
- using namespace blockstore::impl;
-
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path();
-
- BlockStore Store;
- Store.Initialize(RootDirectory / "store", 128, 1024, {});
-
- std::string FirstChunkData = "This is the data of the first chunk that we will write";
- WriteStringAsChunk(Store, FirstChunkData, 4);
- Store.Flush();
- std::string SecondChunkData = "This is the data for the second chunk that we will write";
- WriteStringAsChunk(Store, SecondChunkData, 4);
- Store.Flush();
- std::string ThirdChunkData =
- "This is a much longer string that will not fit in the first block so it should be placed in the second block";
- WriteStringAsChunk(Store, ThirdChunkData, 4);
-
- CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3);
-}
-
-TEST_CASE("blockstore.iterate.chunks")
-{
- using namespace blockstore::impl;
-
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path();
-
- BlockStore Store;
- Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {});
- IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
- CHECK(!BadChunk);
-
- std::string FirstChunkData = "This is the data of the first chunk that we will write";
- BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
-
- std::string SecondChunkData = "This is the data for the second chunk that we will write";
- BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
- Store.Flush();
-
- std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L');
- BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);
-
- BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0};
- BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0,
- .Offset = ScrubSmallChunkWindowSize,
- .Size = ScrubSmallChunkWindowSize * 2};
- BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024};
-
- Store.IterateChunks(
- {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex},
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- switch (ChunkIndex)
- {
- case 0:
- CHECK(Data);
- CHECK(Size == FirstChunkData.size());
- CHECK(std::string((const char*)Data, Size) == FirstChunkData);
- break;
- case 1:
- CHECK(Data);
- CHECK(Size == SecondChunkData.size());
- CHECK(std::string((const char*)Data, Size) == SecondChunkData);
- break;
- case 2:
- CHECK(false);
- break;
- case 3:
- CHECK(!Data);
- break;
- case 4:
- CHECK(!Data);
- break;
- case 5:
- CHECK(!Data);
- break;
- default:
- CHECK(false);
- break;
- }
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- switch (ChunkIndex)
- {
- case 0:
- case 1:
- CHECK(false);
- break;
- case 2:
- {
- CHECK(Size == VeryLargeChunk.size());
- char* Buffer = new char[Size];
- size_t HashOffset = 0;
- File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
- memcpy(&Buffer[HashOffset], Data, Size);
- HashOffset += Size;
- });
- CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
- delete[] Buffer;
- }
- break;
- case 3:
- CHECK(false);
- break;
- case 4:
- CHECK(false);
- break;
- case 5:
- CHECK(false);
- break;
- default:
- CHECK(false);
- break;
- }
- });
-}
-
-TEST_CASE("blockstore.reclaim.space")
-{
- using namespace blockstore::impl;
-
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path();
-
- BlockStore Store;
- Store.Initialize(RootDirectory / "store", 512, 1024, {});
-
- constexpr size_t ChunkCount = 200;
- constexpr size_t Alignment = 8;
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkHashes;
- ChunkLocations.reserve(ChunkCount);
- ChunkHashes.reserve(ChunkCount);
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- IoBuffer Chunk = CreateChunk(57 + ChunkIndex);
-
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); });
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- std::vector<size_t> ChunksToKeep;
- ChunksToKeep.reserve(ChunkLocations.size());
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- ChunksToKeep.push_back(ChunkIndex);
- }
-
- Store.Flush();
- BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState();
- Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true);
-
- // If we keep all the chunks we should not get any callbacks on moved/deleted stuff
- Store.ReclaimSpace(
- State1,
- ChunkLocations,
- ChunksToKeep,
- Alignment,
- false,
- [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); },
- []() {
- CHECK(false);
- return 0;
- });
-
- size_t DeleteChunkCount = 38;
- ChunksToKeep.clear();
- for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- ChunksToKeep.push_back(ChunkIndex);
- }
-
- std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations;
- size_t MovedChunkCount = 0;
- size_t DeletedChunkCount = 0;
- Store.ReclaimSpace(
- State1,
- ChunkLocations,
- ChunksToKeep,
- Alignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
- for (const auto& MovedChunk : MovedChunks)
- {
- CHECK(MovedChunk.first >= DeleteChunkCount);
- NewChunkLocations[MovedChunk.first] = MovedChunk.second;
- }
- MovedChunkCount += MovedChunks.size();
- for (size_t DeletedIndex : DeletedChunks)
- {
- CHECK(DeletedIndex < DeleteChunkCount);
- }
- DeletedChunkCount += DeletedChunks.size();
- },
- []() {
- CHECK(false);
- return 0;
- });
- CHECK(MovedChunkCount <= DeleteChunkCount);
- CHECK(DeletedChunkCount == DeleteChunkCount);
- ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end());
-
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
- if (ChunkIndex >= DeleteChunkCount)
- {
- IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- }
- }
-
- NewChunkLocations = ChunkLocations;
- MovedChunkCount = 0;
- DeletedChunkCount = 0;
- Store.ReclaimSpace(
- State1,
- ChunkLocations,
- {},
- Alignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
- CHECK(MovedChunks.empty());
- DeletedChunkCount += DeletedChunks.size();
- },
- []() {
- CHECK(false);
- return 0;
- });
- CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount);
-}
-
-TEST_CASE("blockstore.thread.read.write")
-{
- using namespace blockstore::impl;
-
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path();
-
- BlockStore Store;
- Store.Initialize(RootDirectory / "store", 1088, 1024, {});
-
- constexpr size_t ChunkCount = 1000;
- constexpr size_t Alignment = 8;
- std::vector<IoBuffer> Chunks;
- std::vector<IoHash> ChunkHashes;
- Chunks.reserve(ChunkCount);
- ChunkHashes.reserve(ChunkCount);
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2);
- Chunks.push_back(Chunk);
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- std::vector<BlockStoreLocation> ChunkLocations;
- ChunkLocations.resize(ChunkCount);
-
- WorkerThreadPool WorkerPool(8);
- std::atomic<size_t> WorkCompleted = 0;
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
-
- WorkCompleted = 0;
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
-
- std::vector<BlockStoreLocation> SecondChunkLocations;
- SecondChunkLocations.resize(ChunkCount);
- WorkCompleted = 0;
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
- SecondChunkLocations[ChunkIndex] = L;
- });
- WorkCompleted.fetch_add(1);
- });
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size() * 2)
- {
- Sleep(1);
- }
-}
-
-#endif
-
-void
-blockstore_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
deleted file mode 100644
index fdec78c60..000000000
--- a/zenstore/cas.cpp
+++ /dev/null
@@ -1,355 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "cas.h"
-
-#include "compactcas.h"
-#include "filecas.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/memory.h>
-#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
-#include <zencore/thread.h>
-#include <zencore/trace.h>
-#include <zencore/uid.h>
-#include <zenstore/cidstore.h>
-#include <zenstore/gc.h>
-#include <zenstore/scrubcontext.h>
-
-#include <gsl/gsl-lite.hpp>
-
-#include <filesystem>
-#include <functional>
-#include <unordered_map>
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-/**
- * CAS store implementation
- *
- * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space
- * quickly for unused large chunks and to maintain locality for small chunks which are
- * frequently accessed together.
- *
- */
-class CasImpl : public CasStore
-{
-public:
- CasImpl(GcManager& Gc);
- virtual ~CasImpl();
-
- virtual void Initialize(const CidStoreConfiguration& InConfig) override;
- virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
- virtual bool ContainsChunk(const IoHash& ChunkHash) override;
- virtual void FilterChunks(HashKeySet& InOutChunks) override;
- virtual void Flush() override;
- virtual void Scrub(ScrubContext& Ctx) override;
- virtual void GarbageCollect(GcContext& GcCtx) override;
- virtual CidStoreSize TotalSize() const override;
-
-private:
- CasContainerStrategy m_TinyStrategy;
- CasContainerStrategy m_SmallStrategy;
- FileCasStrategy m_LargeStrategy;
- CbObject m_ManifestObject;
-
- enum class StorageScheme
- {
- Legacy = 0,
- WithCbManifest = 1
- };
-
- StorageScheme m_StorageScheme = StorageScheme::Legacy;
-
- bool OpenOrCreateManifest();
- void UpdateManifest();
-};
-
-CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc)
-{
-}
-
-CasImpl::~CasImpl()
-{
-}
-
-void
-CasImpl::Initialize(const CidStoreConfiguration& InConfig)
-{
- m_Config = InConfig;
-
- ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory);
-
- // Ensure root directory exists - create if it doesn't exist already
-
- std::filesystem::create_directories(m_Config.RootDirectory);
-
- // Open or create manifest
-
- const bool IsNewStore = OpenOrCreateManifest();
-
- // Initialize payload storage
-
- m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore);
- m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
- m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
-}
-
-bool
-CasImpl::OpenOrCreateManifest()
-{
- bool IsNewStore = false;
-
- std::filesystem::path ManifestPath = m_Config.RootDirectory;
- ManifestPath /= ".ucas_root";
-
- std::error_code Ec;
- BasicFile ManifestFile;
- ManifestFile.Open(ManifestPath.c_str(), BasicFile::Mode::kRead, Ec);
-
- bool ManifestIsOk = false;
-
- if (Ec)
- {
- if (Ec == std::errc::no_such_file_or_directory)
- {
- IsNewStore = true;
- }
- }
- else
- {
- IoBuffer ManifestBuffer = ManifestFile.ReadAll();
- ManifestFile.Close();
-
- if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#')
- {
- // Old-style manifest, does not contain any useful information, so we may as well update it
- }
- else
- {
- CbObject Manifest{SharedBuffer(ManifestBuffer)};
- CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All);
-
- if (ValidationResult == CbValidateError::None)
- {
- if (Manifest["id"])
- {
- ManifestIsOk = true;
- }
- }
- else
- {
- ZEN_WARN("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult));
- }
-
- if (ManifestIsOk)
- {
- m_ManifestObject = std::move(Manifest);
- }
- }
- }
-
- if (!ManifestIsOk)
- {
- UpdateManifest();
- }
-
- return IsNewStore;
-}
-
-void
-CasImpl::UpdateManifest()
-{
- if (!m_ManifestObject)
- {
- CbObjectWriter Cbo;
- Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now();
- m_ManifestObject = Cbo.Save();
- }
-
- // Write manifest to file
-
- std::filesystem::path ManifestPath = m_Config.RootDirectory;
- ManifestPath /= ".ucas_root";
-
- // This will throw on failure
-
- ZEN_TRACE("Writing new manifest to '{}'", ManifestPath);
-
- BasicFile Marker;
- Marker.Open(ManifestPath.c_str(), BasicFile::Mode::kTruncate);
- Marker.Write(m_ManifestObject.GetBuffer(), 0);
-}
-
-CasStore::InsertResult
-CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode)
-{
- ZEN_TRACE_CPU("CAS::InsertChunk");
-
- const uint64_t ChunkSize = Chunk.Size();
-
- if (ChunkSize < m_Config.TinyValueThreshold)
- {
- ZEN_ASSERT(ChunkSize);
-
- return m_TinyStrategy.InsertChunk(Chunk, ChunkHash);
- }
- else if (ChunkSize < m_Config.HugeValueThreshold)
- {
- return m_SmallStrategy.InsertChunk(Chunk, ChunkHash);
- }
-
- return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode);
-}
-
-IoBuffer
-CasImpl::FindChunk(const IoHash& ChunkHash)
-{
- ZEN_TRACE_CPU("CAS::FindChunk");
-
- if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash))
- {
- return Found;
- }
-
- if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash))
- {
- return Found;
- }
-
- if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash))
- {
- return Found;
- }
-
- // Not found
- return IoBuffer{};
-}
-
-bool
-CasImpl::ContainsChunk(const IoHash& ChunkHash)
-{
- return m_SmallStrategy.HaveChunk(ChunkHash) || m_TinyStrategy.HaveChunk(ChunkHash) || m_LargeStrategy.HaveChunk(ChunkHash);
-}
-
-void
-CasImpl::FilterChunks(HashKeySet& InOutChunks)
-{
- m_SmallStrategy.FilterChunks(InOutChunks);
- m_TinyStrategy.FilterChunks(InOutChunks);
- m_LargeStrategy.FilterChunks(InOutChunks);
-}
-
-void
-CasImpl::Flush()
-{
- m_SmallStrategy.Flush();
- m_TinyStrategy.Flush();
- m_LargeStrategy.Flush();
-}
-
-void
-CasImpl::Scrub(ScrubContext& Ctx)
-{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_SmallStrategy.Scrub(Ctx);
- m_TinyStrategy.Scrub(Ctx);
- m_LargeStrategy.Scrub(Ctx);
-}
-
-void
-CasImpl::GarbageCollect(GcContext& GcCtx)
-{
- m_SmallStrategy.CollectGarbage(GcCtx);
- m_TinyStrategy.CollectGarbage(GcCtx);
- m_LargeStrategy.CollectGarbage(GcCtx);
-}
-
-CidStoreSize
-CasImpl::TotalSize() const
-{
- const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize;
- const uint64_t Small = m_SmallStrategy.StorageSize().DiskSize;
- const uint64_t Large = m_LargeStrategy.StorageSize().DiskSize;
-
- return {.TinySize = Tiny, .SmallSize = Small, .LargeSize = Large, .TotalSize = Tiny + Small + Large};
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-std::unique_ptr<CasStore>
-CreateCasStore(GcManager& Gc)
-{
- return std::make_unique<CasImpl>(Gc);
-}
-
-//////////////////////////////////////////////////////////////////////////
-//
-// Testing related code follows...
-//
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("CasStore")
-{
- ScopedTemporaryDirectory TempDir;
-
- CidStoreConfiguration config;
- config.RootDirectory = TempDir.Path();
-
- GcManager Gc;
-
- std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
- Store->Initialize(config);
-
- ScrubContext Ctx;
- Store->Scrub(Ctx);
-
- IoBuffer Value1{16};
- memcpy(Value1.MutableData(), "1234567890123456", 16);
- IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size());
- CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1);
- CHECK(Result1.New);
-
- IoBuffer Value2{16};
- memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16);
- IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size());
- CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2);
- CHECK(Result2.New);
-
- HashKeySet ChunkSet;
- ChunkSet.AddHashToSet(Hash1);
- ChunkSet.AddHashToSet(Hash2);
-
- Store->FilterChunks(ChunkSet);
- CHECK(ChunkSet.IsEmpty());
-
- IoBuffer Lookup1 = Store->FindChunk(Hash1);
- CHECK(Lookup1);
- IoBuffer Lookup2 = Store->FindChunk(Hash2);
- CHECK(Lookup2);
-}
-
-void
-CAS_forcelink()
-{
-}
-
-#endif
-
-} // namespace zen
diff --git a/zenstore/cas.h b/zenstore/cas.h
deleted file mode 100644
index 9c48d4707..000000000
--- a/zenstore/cas.h
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/blake3.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/refcount.h>
-#include <zencore/timer.h>
-#include <zenstore/cidstore.h>
-#include <zenstore/hashkeyset.h>
-
-#include <atomic>
-#include <filesystem>
-#include <functional>
-#include <memory>
-#include <string>
-#include <unordered_set>
-
-namespace zen {
-
-class GcContext;
-class GcManager;
-class ScrubContext;
-
-/** Content Addressable Storage interface
-
- */
-
-class CasStore
-{
-public:
- virtual ~CasStore() = default;
-
- const CidStoreConfiguration& Config() { return m_Config; }
-
- struct InsertResult
- {
- bool New = false;
- };
-
- enum class InsertMode
- {
- kCopyOnly,
- kMayBeMovedInPlace
- };
-
- virtual void Initialize(const CidStoreConfiguration& Config) = 0;
- virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
- virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
- virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
- virtual void Flush() = 0;
- virtual void Scrub(ScrubContext& Ctx) = 0;
- virtual void GarbageCollect(GcContext& GcCtx) = 0;
- virtual CidStoreSize TotalSize() const = 0;
-
-protected:
- CidStoreConfiguration m_Config;
- uint64_t m_LastScrubTime = 0;
-};
-
-ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc);
-
-void CAS_forcelink();
-
-} // namespace zen
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
deleted file mode 100644
index 2a978ae12..000000000
--- a/zenstore/caslog.cpp
+++ /dev/null
@@ -1,236 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenstore/caslog.h>
-
-#include "compactcas.h"
-
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/memory.h>
-#include <zencore/string.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
-
-#include <xxhash.h>
-
-#include <gsl/gsl-lite.hpp>
-
-#include <filesystem>
-#include <functional>
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-uint32_t
-CasLogFile::FileHeader::ComputeChecksum()
-{
- return XXH32(&this->Magic, sizeof(FileHeader) - 4, 0xC0C0'BABA);
-}
-
-CasLogFile::CasLogFile()
-{
-}
-
-CasLogFile::~CasLogFile()
-{
-}
-
-bool
-CasLogFile::IsValid(std::filesystem::path FileName, size_t RecordSize)
-{
- if (!std::filesystem::is_regular_file(FileName))
- {
- return false;
- }
- BasicFile File;
-
- std::error_code Ec;
- File.Open(FileName, BasicFile::Mode::kRead, Ec);
- if (Ec)
- {
- return false;
- }
-
- FileHeader Header;
- if (File.FileSize() < sizeof(Header))
- {
- return false;
- }
-
- // Validate header and log contents and prepare for appending/replay
- File.Read(&Header, sizeof Header, 0);
-
- if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum()))
- {
- return false;
- }
- if (Header.RecordSize != RecordSize)
- {
- return false;
- }
- return true;
-}
-
-void
-CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode)
-{
- m_RecordSize = RecordSize;
-
- std::error_code Ec;
- BasicFile::Mode FileMode = BasicFile::Mode::kRead;
- switch (Mode)
- {
- case Mode::kWrite:
- FileMode = BasicFile::Mode::kWrite;
- break;
- case Mode::kTruncate:
- FileMode = BasicFile::Mode::kTruncate;
- break;
- }
-
- m_File.Open(FileName, FileMode, Ec);
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to open log file '{}'", FileName));
- }
-
- uint64_t AppendOffset = 0;
-
- if ((Mode == Mode::kTruncate) || (m_File.FileSize() < sizeof(FileHeader)))
- {
- if (Mode == Mode::kRead)
- {
- throw std::runtime_error(fmt::format("Mangled log header (file to small) in '{}'", FileName));
- }
- // Initialize log by writing header
- FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0};
- memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic);
- Header.Finalize();
-
- m_File.Write(&Header, sizeof Header, 0);
-
- AppendOffset = sizeof(FileHeader);
-
- m_Header = Header;
- }
- else
- {
- FileHeader Header;
- m_File.Read(&Header, sizeof Header, 0);
-
- if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum()))
- {
- throw std::runtime_error(fmt::format("Mangled log header (invalid header magic) in '{}'", FileName));
- }
- if (Header.RecordSize != RecordSize)
- {
- throw std::runtime_error(fmt::format("Mangled log header (mismatch in record size, expected {}, found {}) in '{}'",
- RecordSize,
- Header.RecordSize,
- FileName));
- }
-
- AppendOffset = m_File.FileSize();
-
- // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended
-
- AppendOffset -= sizeof Header;
- AppendOffset -= AppendOffset % RecordSize;
- AppendOffset += sizeof Header;
-
- m_Header = Header;
- }
-
- m_AppendOffset = AppendOffset;
-}
-
-void
-CasLogFile::Close()
-{
- // TODO: update header and maybe add trailer
- Flush();
-
- m_File.Close();
-}
-
-uint64_t
-CasLogFile::GetLogSize()
-{
- return m_File.FileSize();
-}
-
-uint64_t
-CasLogFile::GetLogCount()
-{
- uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire);
- if (LogFileSize < sizeof(FileHeader))
- {
- return 0;
- }
- const uint64_t LogBaseOffset = sizeof(FileHeader);
- const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
- return LogEntryCount;
-}
-
-void
-CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount)
-{
- uint64_t LogFileSize = m_File.FileSize();
-
- // Ensure we end up on a clean boundary
- uint64_t LogBaseOffset = sizeof(FileHeader);
- size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
-
- if (LogEntryCount <= SkipEntryCount)
- {
- return;
- }
-
- LogBaseOffset += SkipEntryCount * m_RecordSize;
- LogEntryCount -= SkipEntryCount;
-
- // This should really be streaming the data rather than just
- // reading it into memory, though we don't tend to get very
- // large logs so it may not matter
-
- const uint64_t LogDataSize = LogEntryCount * m_RecordSize;
-
- std::vector<uint8_t> ReadBuffer;
- ReadBuffer.resize(LogDataSize);
-
- m_File.Read(ReadBuffer.data(), LogDataSize, LogBaseOffset);
-
- for (int i = 0; i < int(LogEntryCount); ++i)
- {
- Handler(ReadBuffer.data() + (i * m_RecordSize));
- }
-
- m_AppendOffset = LogBaseOffset + (m_RecordSize * LogEntryCount);
-}
-
-void
-CasLogFile::Append(const void* DataPointer, uint64_t DataSize)
-{
- ZEN_ASSERT((DataSize % m_RecordSize) == 0);
-
- uint64_t AppendOffset = m_AppendOffset.fetch_add(DataSize);
-
- std::error_code Ec;
- m_File.Write(DataPointer, gsl::narrow<uint32_t>(DataSize), AppendOffset, Ec);
-
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to write to log file '{}'", PathFromHandle(m_File.Handle())));
- }
-}
-
-void
-CasLogFile::Flush()
-{
- m_File.Flush();
-}
-
-} // namespace zen
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
deleted file mode 100644
index 5a5116faf..000000000
--- a/zenstore/cidstore.cpp
+++ /dev/null
@@ -1,125 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "zenstore/cidstore.h"
-
-#include <zencore/compress.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/string.h>
-#include <zenstore/scrubcontext.h>
-
-#include "cas.h"
-
-#include <filesystem>
-
-namespace zen {
-
-struct CidStore::Impl
-{
- Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {}
-
- CasStore& m_CasStore;
-
- void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); }
-
- CidStore::InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, CidStore::InsertMode Mode)
- {
-#ifndef NDEBUG
- IoHash VerifyRawHash;
- uint64_t _;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHash == VerifyRawHash);
-#endif // NDEBUG
- IoBuffer Payload(ChunkData);
- Payload.SetContentType(ZenContentType::kCompressedBinary);
-
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, RawHash, static_cast<CasStore::InsertMode>(Mode));
-
- return {.New = Result.New};
- }
-
- IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); }
-
- bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); }
-
- void FilterChunks(HashKeySet& InOutChunks)
- {
- InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); });
- }
-
- void Flush() { m_CasStore.Flush(); }
-
- void Scrub(ScrubContext& Ctx)
- {
- if (Ctx.ScrubTimestamp() == m_LastScrubTime)
- {
- return;
- }
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_CasStore.Scrub(Ctx);
- }
-
- uint64_t m_LastScrubTime = 0;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore))
-{
-}
-
-CidStore::~CidStore()
-{
-}
-
-void
-CidStore::Initialize(const CidStoreConfiguration& Config)
-{
- m_Impl->Initialize(Config);
-}
-
-CidStore::InsertResult
-CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode)
-{
- return m_Impl->AddChunk(ChunkData, RawHash, Mode);
-}
-
-IoBuffer
-CidStore::FindChunkByCid(const IoHash& DecompressedId)
-{
- return m_Impl->FindChunkByCid(DecompressedId);
-}
-
-bool
-CidStore::ContainsChunk(const IoHash& DecompressedId)
-{
- return m_Impl->ContainsChunk(DecompressedId);
-}
-
-void
-CidStore::FilterChunks(HashKeySet& InOutChunks)
-{
- return m_Impl->FilterChunks(InOutChunks);
-}
-
-void
-CidStore::Flush()
-{
- m_Impl->Flush();
-}
-
-void
-CidStore::Scrub(ScrubContext& Ctx)
-{
- m_Impl->Scrub(Ctx);
-}
-
-CidStoreSize
-CidStore::TotalSize() const
-{
- return m_Impl->m_CasStore.TotalSize();
-}
-
-} // namespace zen
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
deleted file mode 100644
index 7b2c21b0f..000000000
--- a/zenstore/compactcas.cpp
+++ /dev/null
@@ -1,1511 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "compactcas.h"
-
-#include "cas.h"
-
-#include <zencore/compress.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zenstore/scrubcontext.h>
-
-#include <gsl/gsl-lite.hpp>
-
-#include <xxhash.h>
-
-#if ZEN_WITH_TESTS
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
-# include <zenstore/cidstore.h>
-# include <algorithm>
-# include <random>
-#endif
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-struct CasDiskIndexHeader
-{
- static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t CurrentVersion = 1;
-
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint64_t EntryCount = 0;
- uint64_t LogPosition = 0;
- uint32_t PayloadAlignment = 0;
- uint32_t Checksum = 0;
-
- static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header)
- {
- return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
- }
-};
-
-static_assert(sizeof(CasDiskIndexHeader) == 32);
-
-namespace {
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".ulog";
-
- std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return RootPath / ContainerBaseName;
- }
-
- std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension);
- }
-
- std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension);
- }
-
- std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension);
- }
-
- std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return GetBasePath(RootPath, ContainerBaseName) / "blocks";
- }
-
- bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0)
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
- {
- return true;
- }
- if (Entry.ContentType != ZenContentType::kUnknownContentType)
- {
- OutReason =
- fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
- return false;
- }
- uint64_t Size = Entry.Location.GetSize();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
-} // namespace
-
-//////////////////////////////////////////////////////////////////////////
-
-CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas"))
-{
-}
-
-CasContainerStrategy::~CasContainerStrategy()
-{
-}
-
-void
-CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory,
- const std::string_view ContainerBaseName,
- uint32_t MaxBlockSize,
- uint64_t Alignment,
- bool IsNewStore)
-{
- ZEN_ASSERT(IsPow2(Alignment));
- ZEN_ASSERT(!m_IsInitialized);
- ZEN_ASSERT(MaxBlockSize > 0);
-
- m_RootDirectory = RootDirectory;
- m_ContainerBaseName = ContainerBaseName;
- m_PayloadAlignment = Alignment;
- m_MaxBlockSize = MaxBlockSize;
- m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName);
-
- OpenContainer(IsNewStore);
-
- m_IsInitialized = true;
-}
-
-CasStore::InsertResult
-CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
-{
- {
- RwLock::SharedLockScope _(m_LocationMapLock);
- if (m_LocationMap.contains(ChunkHash))
- {
- return CasStore::InsertResult{.New = false};
- }
- }
-
- // We can end up in a situation that InsertChunk writes the same chunk data in
- // different locations.
- // We release the insert lock once we have the correct WriteBlock ready and we know
- // where to write the data. If a new InsertChunk request for the same chunk hash/data
- // comes in before we update m_LocationMap below we will have a race.
- // The outcome of that is that we will write the chunk data in more than one location
- // but the chunk hash will only point to one of the chunks.
- // We will in that case waste space until the next GC operation.
- //
- // This should be a rare occasion and the current flow reduces the time we block for
- // reads, insert and GC.
-
- m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) {
- BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
- {
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, DiskLocation);
- }
- });
-
- return CasStore::InsertResult{.New = true};
-}
-
-CasStore::InsertResult
-CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
-{
-#if !ZEN_WITH_TESTS
- ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
-#endif
- return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
-}
-
-IoBuffer
-CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
-{
- RwLock::SharedLockScope _(m_LocationMapLock);
- auto KeyIt = m_LocationMap.find(ChunkHash);
- if (KeyIt == m_LocationMap.end())
- {
- return IoBuffer();
- }
- const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment);
-
- IoBuffer Chunk = m_BlockStore.TryGetChunk(Location);
- return Chunk;
-}
-
-bool
-CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
-{
- RwLock::SharedLockScope _(m_LocationMapLock);
- return m_LocationMap.contains(ChunkHash);
-}
-
-void
-CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
-{
- // This implementation is good enough for relatively small
- // chunk sets (in terms of chunk identifiers), but would
- // benefit from a better implementation which removes
- // items incrementally for large sets, especially when
- // we're likely to already have a large proportion of the
- // chunks in the set
-
- InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
-}
-
-void
-CasContainerStrategy::Flush()
-{
- m_BlockStore.Flush();
- m_CasLog.Flush();
- MakeIndexSnapshot();
-}
-
-void
-CasContainerStrategy::Scrub(ScrubContext& Ctx)
-{
- std::vector<IoHash> BadKeys;
- uint64_t ChunkCount{0}, ChunkBytes{0};
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkIndexToChunkHash;
-
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- uint64_t TotalChunkCount = m_LocationMap.size();
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
- {
- for (const auto& Entry : m_LocationMap)
- {
- const IoHash& ChunkHash = Entry.first;
- const BlockStoreDiskLocation& DiskLocation = Entry.second;
- BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
-
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash.push_back(ChunkHash);
- }
- }
-
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
-
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- BadKeys.push_back(Hash);
- return;
- }
-
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- if (RawHash != Hash)
- {
- // Hash mismatch
- BadKeys.push_back(Hash);
- return;
- }
- return;
- }
-#if ZEN_WITH_TESTS
- IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
- if (ComputedHash == Hash)
- {
- return;
- }
-#endif
- BadKeys.push_back(Hash);
- };
-
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
-
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
-
- IoHash RawHash;
- uint64_t RawSize;
- // TODO: Add API to verify compressed buffer without having to memorymap the whole file
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- if (RawHash != Hash)
- {
- // Hash mismatch
- BadKeys.push_back(Hash);
- return;
- }
- return;
- }
-#if ZEN_WITH_TESTS
- IoHashStream Hasher;
- File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
- if (ComputedHash == Hash)
- {
- return;
- }
-#endif
- BadKeys.push_back(Hash);
- };
-
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
-
- _.ReleaseNow();
-
- Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
-
- if (!BadKeys.empty())
- {
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
-
- if (Ctx.RunRecovery())
- {
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- for (const IoHash& ChunkHash : BadKeys)
- {
- const auto KeyIt = m_LocationMap.find(ChunkHash);
- if (KeyIt == m_LocationMap.end())
- {
- // Might have been GC'd
- continue;
- }
- LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
- m_LocationMap.erase(KeyIt);
- }
- }
- m_CasLog.Append(LogEntries);
- }
- }
-
- // Let whomever it concerns know about the bad chunks. This could
- // be used to invalidate higher level data structures more efficiently
- // than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadKeys);
-
- ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
-}
-
-void
-CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
-{
- // It collects all the blocks that we want to delete chunks from. For each such
- // block we keep a list of chunks to retain and a list of chunks to delete.
- //
- // If there is a block that we are currently writing to, that block is omitted
- // from the garbage collection.
- //
- // Next it will iterate over all blocks that we want to remove chunks from.
- // If the block is empty after removal of chunks we mark the block as pending
- // delete - we want to delete it as soon as there are no IoBuffers using the
- // block file.
- // Once complete we update the m_LocationMap by removing the chunks.
- //
- // If the block is non-empty we write out the chunks we want to keep to a new
- // block file (creating new block files as needed).
- //
- // We update the index as we complete each new block file. This makes it possible
- // to break the GC if we want to limit time for execution.
- //
- // GC can very parallell to regular operation - it will block while taking
- // a snapshot of the current m_LocationMap state and while moving blocks it will
- // do a blocking operation and update the m_LocationMap after each new block is
- // written and figuring out the path to the next new block.
-
- ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);
-
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
-
- LocationMap_t LocationMap;
- BlockStore::ReclaimSnapshotState BlockStoreState;
- {
- RwLock::SharedLockScope ___(m_LocationMapLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- LocationMap = m_LocationMap;
- BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
- }
-
- uint64_t TotalChunkCount = LocationMap.size();
-
- std::vector<IoHash> TotalChunkHashes;
- TotalChunkHashes.reserve(TotalChunkCount);
- for (const auto& Entry : LocationMap)
- {
- TotalChunkHashes.push_back(Entry.first);
- }
-
- std::vector<BlockStoreLocation> ChunkLocations;
- BlockStore::ChunkIndexArray KeepChunkIndexes;
- std::vector<IoHash> ChunkIndexToChunkHash;
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
-
- GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- auto KeyIt = LocationMap.find(ChunkHash);
- const BlockStoreDiskLocation& DiskLocation = KeyIt->second;
- BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
- size_t ChunkIndex = ChunkLocations.size();
-
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- if (Keep)
- {
- KeepChunkIndexes.push_back(ChunkIndex);
- }
- });
-
- const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
- if (!PerformDelete)
- {
- m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
- return;
- }
-
- std::vector<IoHash> DeletedChunks;
- m_BlockStore.ReclaimSpace(
- BlockStoreState,
- ChunkLocations,
- KeepChunkIndexes,
- m_PayloadAlignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}});
- }
- for (const size_t ChunkIndex : RemovedChunks)
- {
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash];
- LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone});
- DeletedChunks.push_back(ChunkHash);
- }
-
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
- {
- m_LocationMap.erase(Entry.Key);
- continue;
- }
- m_LocationMap[Entry.Key] = Entry.Location;
- }
- }
- },
- [&GcCtx]() { return GcCtx.CollectSmallObjects(); });
-
- GcCtx.AddDeletedCids(DeletedChunks);
-}
-
-void
-CasContainerStrategy::MakeIndexSnapshot()
-{
- uint64_t LogCount = m_CasLog.GetLogCount();
- if (m_LogFlushPosition == LogCount)
- {
- return;
- }
-
- ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName);
- uint64_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
- m_RootDirectory / m_ContainerBaseName,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- namespace fs = std::filesystem;
-
- fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
- fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(TempIndexPath))
- {
- fs::remove(TempIndexPath);
- }
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, TempIndexPath);
- }
-
- try
- {
- // Write the current state of the location map to a new index state
- std::vector<CasDiskIndexEntry> Entries;
-
- {
- RwLock::SharedLockScope ___(m_LocationMapLock);
- Entries.resize(m_LocationMap.size());
-
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_LocationMap)
- {
- CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = Entry.second;
- }
- }
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
- CasDiskIndexHeader Header = {.EntryCount = Entries.size(),
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
-
- Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header);
-
- ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0);
- ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry));
- ObjectIndexFile.Flush();
- ObjectIndexFile.Close();
- EntryCount = Entries.size();
- m_LogFlushPosition = LogCount;
- }
- catch (std::exception& Err)
- {
- ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
-
- // Restore any previous snapshot
-
- if (fs::is_regular_file(TempIndexPath))
- {
- fs::remove(IndexPath);
- fs::rename(TempIndexPath, IndexPath);
- }
- }
- if (fs::is_regular_file(TempIndexPath))
- {
- fs::remove(TempIndexPath);
- }
-}
-
-uint64_t
-CasContainerStrategy::ReadIndexFile()
-{
- std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
- if (std::filesystem::is_regular_file(IndexPath))
- {
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}",
- IndexPath,
- Entries.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CasDiskIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
- CasDiskIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
- (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
- (Header.EntryCount <= ExpectedEntryCount))
- {
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
- m_PayloadAlignment = Header.PayloadAlignment;
-
- std::string InvalidEntryReason;
- for (const CasDiskIndexEntry& Entry : Entries)
- {
- if (!ValidateEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- m_LocationMap[Entry.Key] = Entry.Location;
- }
-
- return Header.LogPosition;
- }
- else
- {
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
- }
- }
- }
- return 0;
-}
-
-uint64_t
-CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
-{
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
- if (std::filesystem::is_regular_file(LogPath))
- {
- size_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}",
- m_RootDirectory / m_ContainerBaseName,
- LogEntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- TCasLogFile<CasDiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
- LogEntryCount = EntryCount - SkipEntryCount;
- CasLog.Replay(
- [&](const CasDiskIndexEntry& Record) {
- LogEntryCount++;
- std::string InvalidEntryReason;
- if (Record.Flags & CasDiskIndexEntry::kTombstone)
- {
- m_LocationMap.erase(Record.Key);
- return;
- }
- if (!ValidateEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- return;
- }
- m_LocationMap[Record.Key] = Record.Location;
- },
- SkipEntryCount);
- return LogEntryCount;
- }
- }
- return 0;
-}
-
-void
-CasContainerStrategy::OpenContainer(bool IsNewStore)
-{
- // Add .running file and delete on clean on close to detect bad termination
-
- m_LocationMap.clear();
-
- std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName);
-
- if (IsNewStore)
- {
- std::filesystem::remove_all(BasePath);
- }
-
- m_LogFlushPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
-
- CreateDirectories(BasePath);
-
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
- m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_LocationMap.size());
- for (const auto& Entry : m_LocationMap)
- {
- const BlockStoreDiskLocation& Location = Entry.second;
- KnownLocations.push_back(Location.Get(m_PayloadAlignment));
- }
-
- m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
-
- if (IsNewStore || (LogEntryCount > 0))
- {
- MakeIndexSnapshot();
- }
-
- // TODO: should validate integrity of container files here
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-
-namespace {
- static IoBuffer CreateRandomChunk(uint64_t Size)
- {
- static std::random_device rd;
- static std::mt19937 g(rd());
-
- std::vector<uint8_t> Values;
- Values.resize(Size);
- for (size_t Idx = 0; Idx < Size; ++Idx)
- {
- Values[Idx] = static_cast<uint8_t>(Idx);
- }
- std::shuffle(Values.begin(), Values.end(), g);
-
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
- }
-} // namespace
-
-TEST_CASE("compactcas.hex")
-{
- uint32_t Value;
- std::string HexString;
- CHECK(!ParseHexNumber("", Value));
- char Hex[9];
-
- ToHexNumber(0u, Hex);
- HexString = std::string(Hex);
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == 0u);
-
- ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex);
- HexString = std::string(Hex);
- CHECK(HexString == "ffffffff");
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == std::numeric_limits<std::uint32_t>::max());
-
- ToHexNumber(0xadf14711u, Hex);
- HexString = std::string(Hex);
- CHECK(HexString == "adf14711");
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == 0xadf14711u);
-
- ToHexNumber(0x80000000u, Hex);
- HexString = std::string(Hex);
- CHECK(HexString == "80000000");
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == 0x80000000u);
-
- ToHexNumber(0x718293a4u, Hex);
- HexString = std::string(Hex);
- CHECK(HexString == "718293a4");
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == 0x718293a4u);
-}
-
-TEST_CASE("compactcas.compact.gc")
-{
- ScopedTemporaryDirectory TempDir;
-
- const int kIterationCount = 1000;
-
- std::vector<IoHash> Keys(kIterationCount);
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- CbObjectWriter Cbo;
- Cbo << "id" << i;
- CbObject Obj = Cbo.Save();
-
- IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
- const IoHash Hash = HashBuffer(ObjBuffer);
-
- Cas.InsertChunk(ObjBuffer, Hash);
-
- Keys[i] = Hash;
- }
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- IoBuffer Chunk = Cas.FindChunk(Keys[i]);
-
- CHECK(!!Chunk);
-
- CbObject Value = LoadCompactBinaryObject(Chunk);
-
- CHECK_EQ(Value["id"].AsInt32(), i);
- }
- }
-
- // Validate that we can still read the inserted data after closing
- // the original cas store
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- IoBuffer Chunk = Cas.FindChunk(Keys[i]);
-
- CHECK(!!Chunk);
-
- CbObject Value = LoadCompactBinaryObject(Chunk);
-
- CHECK_EQ(Value["id"].AsInt32(), i);
- }
- }
-}
-
-TEST_CASE("compactcas.compact.totalsize")
-{
- std::random_device rd;
- std::mt19937 g(rd());
-
- // for (uint32_t i = 0; i < 100; ++i)
- {
- ScopedTemporaryDirectory TempDir;
-
- const uint64_t kChunkSize = 1024;
- const int32_t kChunkCount = 16;
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- IoBuffer Chunk = CreateRandomChunk(kChunkSize);
- const IoHash Hash = HashBuffer(Chunk);
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
- }
-
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
- }
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
-
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
- }
-
- // Re-open again, this time we should have a snapshot
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
-
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
- }
- }
-}
-
-TEST_CASE("compactcas.gc.basic")
-{
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
-
- IoBuffer Chunk = CreateRandomChunk(128);
- IoHash ChunkHash = IoHash::HashBuffer(Chunk);
-
- const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
- CHECK(InsertResult.New);
- Cas.Flush();
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
-
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHash));
-}
-
-TEST_CASE("compactcas.gc.removefile")
-{
- ScopedTemporaryDirectory TempDir;
-
- IoBuffer Chunk = CreateRandomChunk(128);
- IoHash ChunkHash = IoHash::HashBuffer(Chunk);
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
-
- const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
- CHECK(InsertResult.New);
- const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash);
- CHECK(!InsertResultDup.New);
- Cas.Flush();
- }
-
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false);
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
-
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHash));
-}
-
-TEST_CASE("compactcas.gc.compact")
-{
- // for (uint32_t i = 0; i < 100; ++i)
- {
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true);
-
- uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(9);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(CreateRandomChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(9);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New);
- CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New);
- CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New);
- CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New);
- CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New);
- CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New);
- CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New);
- CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New);
- CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New);
-
- CHECK(Cas.HaveChunk(ChunkHashes[0]));
- CHECK(Cas.HaveChunk(ChunkHashes[1]));
- CHECK(Cas.HaveChunk(ChunkHashes[2]));
- CHECK(Cas.HaveChunk(ChunkHashes[3]));
- CHECK(Cas.HaveChunk(ChunkHashes[4]));
- CHECK(Cas.HaveChunk(ChunkHashes[5]));
- CHECK(Cas.HaveChunk(ChunkHashes[6]));
- CHECK(Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- // Keep first and last
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
-
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[0]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(Cas.HaveChunk(ChunkHashes[0]));
- CHECK(!Cas.HaveChunk(ChunkHashes[1]));
- CHECK(!Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(!Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(!Cas.HaveChunk(ChunkHashes[6]));
- CHECK(!Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
- Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Keep last
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHashes[0]));
- CHECK(!Cas.HaveChunk(ChunkHashes[1]));
- CHECK(!Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(!Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(!Cas.HaveChunk(ChunkHashes[6]));
- CHECK(!Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
- Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Keep mixed
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[1]);
- KeepChunks.push_back(ChunkHashes[4]);
- KeepChunks.push_back(ChunkHashes[7]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHashes[0]));
- CHECK(Cas.HaveChunk(ChunkHashes[1]));
- CHECK(!Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(!Cas.HaveChunk(ChunkHashes[6]));
- CHECK(Cas.HaveChunk(ChunkHashes[7]));
- CHECK(!Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
-
- Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
- Cas.InsertChunk(Chunks[8], ChunkHashes[8]);
- }
-
- // Keep multiple at end
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[6]);
- KeepChunks.push_back(ChunkHashes[7]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHashes[0]));
- CHECK(!Cas.HaveChunk(ChunkHashes[1]));
- CHECK(!Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(!Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(Cas.HaveChunk(ChunkHashes[6]));
- CHECK(Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- }
-
- // Keep every other
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[0]);
- KeepChunks.push_back(ChunkHashes[2]);
- KeepChunks.push_back(ChunkHashes[4]);
- KeepChunks.push_back(ChunkHashes[6]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(Cas.HaveChunk(ChunkHashes[0]));
- CHECK(!Cas.HaveChunk(ChunkHashes[1]));
- CHECK(Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(Cas.HaveChunk(ChunkHashes[6]));
- CHECK(!Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Verify that we nicely appended blocks even after all GC operations
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
- }
-}
-
-TEST_CASE("compactcas.gc.deleteblockonopen")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(20);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(CreateRandomChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(20);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
- }
-
- // GC every other block
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- for (size_t i = 0; i < 20; i += 2)
- {
- KeepChunks.push_back(ChunkHashes[i]);
- }
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- for (size_t i = 0; i < 20; i += 2)
- {
- CHECK(Cas.HaveChunk(ChunkHashes[i]));
- CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
- }
- }
- }
- {
- // Re-open
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 1024, 16, false);
-
- for (size_t i = 0; i < 20; i += 2)
- {
- CHECK(Cas.HaveChunk(ChunkHashes[i]));
- CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
- }
- }
-}
-
-TEST_CASE("compactcas.gc.handleopeniobuffer")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(20);
- for (const uint64_t& Size : ChunkSizes)
- {
- Chunks.push_back(CreateRandomChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(20);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
- }
-
- IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]);
- Cas.Flush();
-
- // GC everything
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- Cas.CollectGarbage(GcCtx);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(!Cas.HaveChunk(ChunkHashes[i]));
- }
-
- CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
-}
-
-TEST_CASE("compactcas.threadedinsert")
-{
- // for (uint32_t i = 0; i < 100; ++i)
- {
- ScopedTemporaryDirectory TempDir;
-
- const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 4096;
- uint64_t ExpectedSize = 0;
-
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
- Chunks.reserve(kChunkCount);
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- while (true)
- {
- IoBuffer Chunk = CreateRandomChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- if (Chunks.contains(Hash))
- {
- continue;
- }
- Chunks[Hash] = Chunk;
- ExpectedSize += Chunk.Size();
- break;
- }
- }
-
- std::atomic<size_t> WorkCompleted = 0;
- WorkerThreadPool ThreadPool(4);
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
- {
- for (const auto& Chunk : Chunks)
- {
- const IoHash& Hash = Chunk.first;
- const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
- ZEN_ASSERT(InsertResult.New);
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
- }
-
- WorkCompleted = 0;
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_LE(ExpectedSize, TotalSize);
- CHECK_GE(ExpectedSize + 32768, TotalSize);
-
- {
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- IoHash Hash = IoHash::HashBuffer(Buffer);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
- }
-
- std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes;
- GcChunkHashes.reserve(Chunks.size());
- for (const auto& Chunk : Chunks)
- {
- GcChunkHashes.insert(Chunk.first);
- }
- {
- WorkCompleted = 0;
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
- NewChunks.reserve(kChunkCount);
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- IoBuffer Chunk = CreateRandomChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunks[Hash] = Chunk;
- }
-
- std::atomic_uint32_t AddedChunkCount;
-
- for (const auto& Chunk : NewChunks)
- {
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Cas.InsertChunk(Chunk.second, Chunk.first);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
- }
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- if (Buffer)
- {
- CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
- }
- WorkCompleted.fetch_add(1);
- });
- }
-
- while (AddedChunkCount.load() < NewChunks.size())
- {
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- if (Cas.HaveChunk(Chunk.first))
- {
- GcChunkHashes.emplace(Chunk.first);
- }
- }
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- GcCtx.AddRetainedCids(KeepHashes);
- Cas.CollectGarbage(GcCtx);
- const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
- }
-
- while (WorkCompleted < NewChunks.size() + Chunks.size())
- {
- Sleep(1);
- }
-
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- if (Cas.HaveChunk(Chunk.first))
- {
- GcChunkHashes.emplace(Chunk.first);
- }
- }
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- GcCtx.AddRetainedCids(KeepHashes);
- Cas.CollectGarbage(GcCtx);
- const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
- }
- {
- WorkCompleted = 0;
- for (const IoHash& ChunkHash : GcChunkHashes)
- {
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
- CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < GcChunkHashes.size())
- {
- Sleep(1);
- }
- }
- }
-}
-
-#endif
-
-void
-compactcas_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
deleted file mode 100644
index b0c6699eb..000000000
--- a/zenstore/compactcas.h
+++ /dev/null
@@ -1,95 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/zencore.h>
-#include <zenstore/blockstore.h>
-#include <zenstore/caslog.h>
-#include <zenstore/gc.h>
-
-#include "cas.h"
-
-#include <atomic>
-#include <limits>
-#include <unordered_map>
-
-namespace spdlog {
-class logger;
-}
-
-namespace zen {
-
-//////////////////////////////////////////////////////////////////////////
-
-#pragma pack(push)
-#pragma pack(1)
-
-struct CasDiskIndexEntry
-{
- static const uint8_t kTombstone = 0x01;
-
- IoHash Key;
- BlockStoreDiskLocation Location;
- ZenContentType ContentType = ZenContentType::kUnknownContentType;
- uint8_t Flags = 0;
-};
-
-#pragma pack(pop)
-
-static_assert(sizeof(CasDiskIndexEntry) == 32);
-
-/** This implements a storage strategy for small CAS values
- *
- * New chunks are simply appended to a small object file, and an index is
- * maintained to allow chunks to be looked up within the active small object
- * files
- *
- */
-
-struct CasContainerStrategy final : public GcStorage
-{
- CasContainerStrategy(GcManager& Gc);
- ~CasContainerStrategy();
-
- CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
- IoBuffer FindChunk(const IoHash& ChunkHash);
- bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(HashKeySet& InOutChunks);
- void Initialize(const std::filesystem::path& RootDirectory,
- const std::string_view ContainerBaseName,
- uint32_t MaxBlockSize,
- uint64_t Alignment,
- bool IsNewStore);
- void Flush();
- void Scrub(ScrubContext& Ctx);
- virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_BlockStore.TotalSize()}; }
-
-private:
- CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
- void MakeIndexSnapshot();
- uint64_t ReadIndexFile();
- uint64_t ReadLog(uint64_t SkipEntryCount);
- void OpenContainer(bool IsNewStore);
-
- spdlog::logger& Log() { return m_Log; }
-
- std::filesystem::path m_RootDirectory;
- spdlog::logger& m_Log;
- uint64_t m_PayloadAlignment = 1u << 4;
- uint64_t m_MaxBlockSize = 1u << 28;
- bool m_IsInitialized = false;
- TCasLogFile<CasDiskIndexEntry> m_CasLog;
- uint64_t m_LogFlushPosition = 0;
- std::string m_ContainerBaseName;
- std::filesystem::path m_BlocksBasePath;
- BlockStore m_BlockStore;
-
- RwLock m_LocationMapLock;
- typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t;
- LocationMap_t m_LocationMap;
-};
-
-void compactcas_forcelink();
-
-} // namespace zen
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
deleted file mode 100644
index 1d25920c4..000000000
--- a/zenstore/filecas.cpp
+++ /dev/null
@@ -1,1452 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "filecas.h"
-
-#include <zencore/compress.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/memory.h>
-#include <zencore/scopeguard.h>
-#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
-#include <zencore/thread.h>
-#include <zencore/timer.h>
-#include <zencore/uid.h>
-#include <zenstore/gc.h>
-#include <zenstore/scrubcontext.h>
-#include <zenutil/basicfile.h>
-
-#if ZEN_WITH_TESTS
-# include <zencore/compactbinarybuilder.h>
-#endif
-
-#include <gsl/gsl-lite.hpp>
-
-#include <barrier>
-#include <filesystem>
-#include <functional>
-#include <unordered_map>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <xxhash.h>
-#if ZEN_PLATFORM_WINDOWS
-# include <atlfile.h>
-#endif
-ZEN_THIRD_PARTY_INCLUDES_END
-
-namespace zen {
-
-namespace filecas::impl {
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".ulog";
-
- std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); }
-
- std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir)
- {
- return RootDir / fmt::format("cas.tmp{}", IndexExtension);
- }
-
- std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); }
-
-#pragma pack(push)
-#pragma pack(1)
-
- struct FileCasIndexHeader
- {
- static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t CurrentVersion = 1;
-
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint64_t EntryCount = 0;
- uint64_t LogPosition = 0;
- uint32_t Reserved = 0;
- uint32_t Checksum = 0;
-
- static uint32_t ComputeChecksum(const FileCasIndexHeader& Header)
- {
- return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
- }
- };
-
- static_assert(sizeof(FileCasIndexHeader) == 32);
-
-#pragma pack(pop)
-
-} // namespace filecas::impl
-
-FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash)
-{
- ShardedPath.Append(RootPath.c_str());
-
- ExtendableStringBuilder<64> HashString;
- ChunkHash.ToHexString(HashString);
-
- const char* str = HashString.c_str();
-
- // Shard into a path with two directory levels containing 12 bits and 8 bits
- // respectively.
- //
- // This results in a maximum of 4096 * 256 directories
- //
- // The numbers have been chosen somewhat arbitrarily but are large to scale
- // to very large chunk repositories without creating too many directories
- // on a single level since NTFS does not deal very well with this.
- //
- // It may or may not make sense to make this a configurable policy, and it
- // would probably be a good idea to measure performance for different
- // policies and chunk counts
-
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str, str + 3);
-
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str + 3, str + 5);
- Shard2len = ShardedPath.Size();
-
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str + 5, str + 40);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas"))
-{
-}
-
-FileCasStrategy::~FileCasStrategy()
-{
-}
-
-void
-FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore)
-{
- using namespace filecas::impl;
-
- m_IsInitialized = true;
-
- m_RootDirectory = RootDirectory;
-
- m_Index.clear();
-
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
- std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
-
- if (IsNewStore)
- {
- std::filesystem::remove(LogPath);
- std::filesystem::remove(IndexPath);
-
- if (std::filesystem::is_directory(m_RootDirectory))
- {
- // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside
- // in this folder as well
- struct Visitor : public FileSystemTraversal::TreeVisitor
- {
- virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override
- {
- // We don't care about files
- }
- static bool IsHexChar(std::filesystem::path::value_type C)
- {
- return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16];
- }
- virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
- [[maybe_unused]] const path_view& DirectoryName) override
- {
- if (DirectoryName.length() == 3)
- {
- if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2]))
- {
- ShardedRoots.push_back(Parent / DirectoryName);
- }
- }
- return false;
- }
- std::vector<std::filesystem::path> ShardedRoots;
- } CasVisitor;
-
- FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor);
- for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots)
- {
- std::filesystem::remove_all(SharededRoot);
- }
- }
- }
-
- m_LogFlushPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
- for (const auto& Entry : m_Index)
- {
- m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed);
- }
-
- CreateDirectories(m_RootDirectory);
- m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
- if (IsNewStore || LogEntryCount > 0)
- {
- MakeIndexSnapshot();
- }
-}
-
-#if ZEN_PLATFORM_WINDOWS
-static void
-DeletePayloadFileOnClose(const void* FileHandle)
-{
- const HANDLE WinFileHandle = (const HANDLE)FileHandle;
- // This will cause the file to be deleted when the last handle to it is closed
- FILE_DISPOSITION_INFO Fdi{};
- Fdi.DeleteFile = TRUE;
- BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
-
- if (!Success)
- {
- // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed
- // to delete it.
- ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'",
- PathFromHandle(WinFileHandle),
- GetLastErrorAsString());
- }
-}
-#endif
-
-CasStore::InsertResult
-FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
-{
- ZEN_ASSERT(m_IsInitialized);
-
-#if !ZEN_WITH_TESTS
- ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
-#endif
-
- if (Mode == CasStore::InsertMode::kCopyOnly)
- {
- {
- RwLock::SharedLockScope _(m_Lock);
- if (m_Index.contains(ChunkHash))
- {
- return CasStore::InsertResult{.New = false};
- }
- }
- return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
- }
-
- // File-based chunks have special case handling whereby we move the file into
- // place in the file store directory, thus avoiding unnecessary copying
-
- IoBufferFileReference FileRef;
- if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
- {
- {
- bool Exists = true;
- {
- RwLock::SharedLockScope _(m_Lock);
- Exists = m_Index.contains(ChunkHash);
- }
- if (Exists)
- {
-#if ZEN_PLATFORM_WINDOWS
- DeletePayloadFileOnClose(FileRef.FileHandle);
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle);
- if (unlink(FilePath.c_str()) < 0)
- {
- int UnlinkError = zen::GetLastError();
- if (UnlinkError != ENOENT)
- {
- ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
- FilePath.string(),
- GetSystemErrorAsString(UnlinkError));
- }
- }
-#endif
- return CasStore::InsertResult{.New = false};
- }
- }
-
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
-
- RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
-
-#if ZEN_PLATFORM_WINDOWS
- const HANDLE ChunkFileHandle = FileRef.FileHandle;
- // See if file already exists
- {
- CAtlFile PayloadFile;
-
- if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
- {
- // If we succeeded in opening the target file then we don't need to do anything else because it already exists
- // and should contain the content we were about to insert
-
- // We do need to ensure the source file goes away on close, however
- size_t ChunkSize = Chunk.GetSize();
- uint64_t FileSize = 0;
- if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
- {
- HashLock.ReleaseNow();
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
-
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
- }
-
- DeletePayloadFileOnClose(ChunkFileHandle);
-
- return CasStore::InsertResult{.New = IsNew};
- }
- else
- {
- ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
- Name.ShardedPath.ToUtf8(),
- ChunkSize,
- FileSize);
- }
- }
- else
- {
- if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
- {
- // Shard directory does not exist
- }
- else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND))
- {
- // Shard directory exists, but not the file
- }
- else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION))
- {
- // Sharing violation, likely because we are trying to open a file
- // which has been renamed on another thread, and the file handle
- // used to rename it is still open. We handle this case below
- // instead of here
- }
- else
- {
- ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes);
- }
- }
- }
-
- std::filesystem::path FullPath(Name.ShardedPath.c_str());
-
- std::filesystem::path FilePath = FullPath.parent_path();
- std::wstring FileName = FullPath.native();
-
- const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR));
- FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize));
- memset(RenameInfo, 0, BufferSize);
-
- RenameInfo->ReplaceIfExists = FALSE;
- RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size());
- memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR));
- RenameInfo->FileName[FileName.size()] = 0;
-
- auto $ = MakeGuard([&] { Memory::Free(RenameInfo); });
-
- // Try to move file into place
- BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
-
- if (!Success)
- {
- // The rename/move could fail because the target directory does not yet exist. This code attempts
- // to create it
-
- CAtlFile DirHandle;
-
- auto InternalCreateDirectoryHandle = [&] {
- return DirHandle.Create(FilePath.c_str(),
- GENERIC_READ | GENERIC_WRITE,
- FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
- OPEN_EXISTING,
- FILE_FLAG_BACKUP_SEMANTICS);
- };
-
- // It's possible for several threads to enter this logic trying to create the same
- // directory. Only one will create the directory of course, but all threads will
- // make it through okay
-
- HRESULT hRes = InternalCreateDirectoryHandle();
-
- if (FAILED(hRes))
- {
- // TODO: we can handle directory creation more intelligently and efficiently than
- // this currently does
-
- CreateDirectories(FilePath.c_str());
-
- hRes = InternalCreateDirectoryHandle();
- }
-
- if (FAILED(hRes))
- {
- ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath));
- }
-
- // Retry rename/move
-
- Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
- }
-
- if (Success)
- {
- m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
-
- HashLock.ReleaseNow();
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
-
- return CasStore::InsertResult{.New = IsNew};
- }
-
- const DWORD LastError = GetLastError();
-
- if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
- {
- HashLock.ReleaseNow();
- DeletePayloadFileOnClose(ChunkFileHandle);
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
-
- return CasStore::InsertResult{.New = IsNew};
- }
-
- ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
- GetSystemErrorAsString(LastError),
- ChunkHash);
-
- DeletePayloadFileOnClose(ChunkFileHandle);
-
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle);
- std::filesystem::path DestPath = Name.ShardedPath.c_str();
- int Ret = link(SourcePath.c_str(), DestPath.c_str());
- if (Ret < 0 && zen::GetLastError() == ENOENT)
- {
- // Destination directory doesn't exist. Create it any try again.
- CreateDirectories(DestPath.parent_path().c_str());
- Ret = link(SourcePath.c_str(), DestPath.c_str());
- }
- int LinkError = zen::GetLastError();
-
- if (unlink(SourcePath.c_str()) < 0)
- {
- int UnlinkError = zen::GetLastError();
- if (UnlinkError != ENOENT)
- {
- ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
- SourcePath.string(),
- GetSystemErrorAsString(UnlinkError));
- }
- }
-
- // It is possible that someone beat us to it in linking the file. In that
- // case a "file exists" error is okay. All others are not.
- if (Ret < 0)
- {
- if (LinkError == EEXIST)
- {
- HashLock.ReleaseNow();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
-
- ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
- GetSystemErrorAsString(LinkError),
- ChunkHash);
- }
- else
- {
- HashLock.ReleaseNow();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
-#endif // ZEN_PLATFORM_*
- }
-
- return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
-}
-
-CasStore::InsertResult
-FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
-{
- ZEN_ASSERT(m_IsInitialized);
-
- {
- RwLock::SharedLockScope _(m_Lock);
- if (m_Index.contains(ChunkHash))
- {
- return {.New = false};
- }
- }
-
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
-
- // See if file already exists
-
-#if ZEN_PLATFORM_WINDOWS
- CAtlFile PayloadFile;
-
- HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
-
- if (SUCCEEDED(hRes))
- {
- // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the
- // content we were about to insert
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
-
- PayloadFile.Close();
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- if (access(Name.ShardedPath.c_str(), F_OK) == 0)
- {
- return CasStore::InsertResult{.New = false};
- }
-#endif
-
- RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
-
-#if ZEN_PLATFORM_WINDOWS
- // For now, use double-checked locking to see if someone else was first
-
- hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
-
- if (SUCCEEDED(hRes))
- {
- uint64_t FileSize = 0;
- if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
- {
- // If we succeeded in opening the file then and the size is correct we don't need to do anything
- // else because someone else managed to create the file before we did. Just return.
-
- HashLock.ReleaseNow();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
- else
- {
- ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
- Name.ShardedPath.ToUtf8(),
- ChunkSize,
- FileSize);
- }
- }
-
- if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)))
- {
- ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes));
- }
-
- auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
-
- hRes = InternalCreateFile();
-
- if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
- {
- // Ensure parent directories exist and retry file creation
- CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len));
- hRes = InternalCreateFile();
- }
-
- if (FAILED(hRes))
- {
- ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8()));
- }
-#else
- // Attempt to exclusively create the file.
- auto InternalCreateFile = [&] {
- int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666);
- if (Fd >= 0)
- {
- fchmod(Fd, 0666);
- }
- return Fd;
- };
- int Fd = InternalCreateFile();
- if (Fd < 0)
- {
- switch (zen::GetLastError())
- {
- case EEXIST:
- // Another thread has beat us to it so we're golden.
- {
- HashLock.ReleaseNow();
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
- }
- return {.New = IsNew};
- }
- break;
-
- case ENOENT:
- if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len)))
- {
- Fd = InternalCreateFile();
- if (Fd >= 0)
- {
- break;
- }
- }
- ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath));
-
- default:
- ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8()));
- }
- }
-
- struct FdWrapper
- {
- ~FdWrapper() { Close(); }
- void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); }
- void Close()
- {
- if (Fd >= 0)
- {
- close(Fd);
- Fd = -1;
- }
- }
- int Fd;
- } PayloadFile = {Fd};
-#endif // ZEN_PLATFORM_WINDOWS
-
- size_t ChunkRemain = ChunkSize;
- auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData);
-
- while (ChunkRemain != 0)
- {
- uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain));
-
- PayloadFile.Write(ChunkCursor, ByteCount);
-
- ChunkCursor += ByteCount;
- ChunkRemain -= ByteCount;
- }
-
- // We cannot rely on RAII to close the file handle since it would be closed
- // *after* the lock is released due to the initialization order
- PayloadFile.Close();
-
- m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
-
- HashLock.ReleaseNow();
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
- }
-
- return {.New = IsNew};
-}
-
-IoBuffer
-FileCasStrategy::FindChunk(const IoHash& ChunkHash)
-{
- ZEN_ASSERT(m_IsInitialized);
-
- {
- RwLock::SharedLockScope _(m_Lock);
- if (!m_Index.contains(ChunkHash))
- {
- return {};
- }
- }
-
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
-
- RwLock::SharedLockScope _(LockForHash(ChunkHash));
-
- return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
-}
-
-bool
-FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
-{
- ZEN_ASSERT(m_IsInitialized);
-
- RwLock::SharedLockScope _(m_Lock);
- return m_Index.contains(ChunkHash);
-}
-
-void
-FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
-{
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
-
- uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec));
- if (Ec)
- {
- ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8());
- FileSize = 0;
- }
-
- ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize));
- std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
-
- if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str()))
- {
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
- {
- m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed);
- m_Index.erase(It);
- }
- }
- m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize});
- }
-}
-
-void
-FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
-{
- ZEN_ASSERT(m_IsInitialized);
-
- // NOTE: it's not a problem now, but in the future if a GC should happen while this
- // is in flight, the result could be wrong since chunks could go away in the meantime.
- //
- // It would be good to have a pinning mechanism to make this less likely but
- // given that chunks could go away at any point after the results are returned to
- // a caller, this is something which needs to be taken into account by anyone consuming
- // this functionality in any case
-
- InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
-}
-
-void
-FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback)
-{
- ZEN_ASSERT(m_IsInitialized);
-
- RwLock::SharedLockScope _(m_Lock);
- for (const auto& It : m_Index)
- {
- const IoHash& NameHash = It.first;
- ShardingHelper Name(m_RootDirectory.c_str(), NameHash);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
- Callback(NameHash, std::move(Payload));
- }
-}
-
-void
-FileCasStrategy::Flush()
-{
- // Since we don't keep files open after writing there's nothing specific
- // to flush here.
- //
- // Depending on what semantics we want Flush() to provide, it could be
- // argued that this should just flush the volume which we are using to
- // store the CAS files on here, to ensure metadata is flushed along
- // with file data
- //
- // Related: to facilitate more targeted validation during recovery we could
- // maintain a log of when chunks were created
-}
-
-void
-FileCasStrategy::Scrub(ScrubContext& Ctx)
-{
- ZEN_ASSERT(m_IsInitialized);
-
- std::vector<IoHash> BadHashes;
- uint64_t ChunkCount{0}, ChunkBytes{0};
-
- {
- std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
- RwLock::ExclusiveLockScope _(m_Lock);
- for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
- {
- if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed);
- m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size});
- }
- }
- }
-
- IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
- if (!Payload)
- {
- BadHashes.push_back(Hash);
- return;
- }
- ++ChunkCount;
- ChunkBytes += Payload.GetSize();
-
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize))
- {
- if (RawHash != Hash)
- {
- // Hash mismatch
- BadHashes.push_back(Hash);
- return;
- }
- return;
- }
-#if ZEN_WITH_TESTS
- IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload))));
- if (ComputedHash == Hash)
- {
- return;
- }
-#endif
- BadHashes.push_back(Hash);
- });
-
- Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
-
- if (!BadHashes.empty())
- {
- ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size());
-
- if (Ctx.RunRecovery())
- {
- ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size());
-
- for (const IoHash& Hash : BadHashes)
- {
- std::error_code Ec;
- DeleteChunk(Hash, Ec);
-
- if (Ec)
- {
- ZEN_WARN("failed to delete file for chunk {}", Hash);
- }
- }
- }
- }
-
- // Let whomever it concerns know about the bad chunks. This could
- // be used to invalidate higher level data structures more efficiently
- // than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadHashes);
-
- ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
-}
-
-void
-FileCasStrategy::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_ASSERT(m_IsInitialized);
-
- ZEN_DEBUG("collecting garbage from {}", m_RootDirectory);
-
- std::vector<IoHash> ChunksToDelete;
- std::atomic<uint64_t> ChunksToDeleteBytes{0};
- std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
-
- std::vector<IoHash> CandidateCas;
- CandidateCas.resize(1);
-
- uint64_t DeletedCount = 0;
- uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
- m_RootDirectory,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- DeletedCount,
- ChunkCount,
- NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)),
- NiceBytes(OldTotalSize));
- });
-
- IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
- bool KeepThis = false;
- CandidateCas[0] = Hash;
- GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
- ZEN_UNUSED(Hash);
- KeepThis = true;
- });
-
- const uint64_t FileSize = Payload.GetSize();
-
- if (!KeepThis)
- {
- ChunksToDelete.push_back(Hash);
- ChunksToDeleteBytes.fetch_add(FileSize);
- }
-
- ++ChunkCount;
- ChunkBytes.fetch_add(FileSize);
- });
-
- // TODO, any entires we did not encounter during our IterateChunks should be removed from the index
-
- if (ChunksToDelete.empty())
- {
- ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory);
- return;
- }
-
- ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})",
- m_RootDirectory,
- ChunksToDelete.size(),
- ChunkCount.load(),
- NiceBytes(ChunksToDeleteBytes));
-
- if (GcCtx.IsDeletionMode() == false)
- {
- ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled");
-
- return;
- }
-
- for (const IoHash& Hash : ChunksToDelete)
- {
- ZEN_TRACE("deleting chunk {}", Hash);
-
- std::error_code Ec;
- DeleteChunk(Hash, Ec);
-
- if (Ec)
- {
- ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message());
- continue;
- }
- DeletedCount++;
- }
-
- GcCtx.AddDeletedCids(ChunksToDelete);
-}
-
-bool
-FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason)
-{
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Flags & (~FileCasIndexEntry::kTombStone))
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
- return false;
- }
- if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
- {
- return true;
- }
- uint64_t Size = Entry.Size;
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
-}
-
-void
-FileCasStrategy::MakeIndexSnapshot()
-{
- using namespace filecas::impl;
-
- uint64_t LogCount = m_CasLog.GetLogCount();
- if (m_LogFlushPosition == LogCount)
- {
- return;
- }
- ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory);
- uint64_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
- m_RootDirectory,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- namespace fs = std::filesystem;
-
- fs::path IndexPath = GetIndexPath(m_RootDirectory);
- fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(STmpIndexPath);
- }
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, STmpIndexPath);
- }
-
- try
- {
- // Write the current state of the location map to a new index state
- std::vector<FileCasIndexEntry> Entries;
-
- {
- Entries.resize(m_Index.size());
-
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_Index)
- {
- FileCasIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Size = Entry.second.Size;
- }
- }
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
- filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount};
-
- Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header);
-
- ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0);
- ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader));
- ObjectIndexFile.Flush();
- ObjectIndexFile.Close();
- EntryCount = Entries.size();
- m_LogFlushPosition = LogCount;
- }
- catch (std::exception& Err)
- {
- ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
-
- // Restore any previous snapshot
-
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(IndexPath);
- fs::rename(STmpIndexPath, IndexPath);
- }
- }
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(STmpIndexPath);
- }
-}
-uint64_t
-FileCasStrategy::ReadIndexFile()
-{
- using namespace filecas::impl;
-
- std::vector<FileCasIndexEntry> Entries;
- std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
- if (std::filesystem::is_regular_file(IndexPath))
- {
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}",
- IndexPath,
- Entries.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(FileCasIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry);
- FileCasIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) &&
- (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount))
- {
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader));
-
- std::string InvalidEntryReason;
- for (const FileCasIndexEntry& Entry : Entries)
- {
- if (!ValidateEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
- }
-
- return Header.LogPosition;
- }
- else
- {
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
- }
- }
- return 0;
- }
-
- if (std::filesystem::is_directory(m_RootDirectory))
- {
- ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory);
- TCasLogFile<FileCasIndexEntry> CasLog;
- uint64_t TotalSize = 0;
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}",
- m_RootDirectory,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- CasLog.GetLogCount(),
- NiceBytes(TotalSize));
- });
-
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
-
- std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
- CasLog.Open(LogPath, CasLogFile::Mode::kTruncate);
- std::string InvalidEntryReason;
- for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
- {
- if (!ValidateEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason);
- continue;
- }
- m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
- CasLog.Append(Entry);
- }
-
- CasLog.Close();
- }
-
- return 0;
-}
-
-uint64_t
-FileCasStrategy::ReadLog(uint64_t SkipEntryCount)
-{
- using namespace filecas::impl;
-
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
- if (std::filesystem::is_regular_file(LogPath))
- {
- uint64_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- TCasLogFile<FileCasIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
- LogEntryCount = EntryCount - SkipEntryCount;
- m_Index.reserve(LogEntryCount);
- uint64_t InvalidEntryCount = 0;
- CasLog.Replay(
- [&](const FileCasIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Flags & FileCasIndexEntry::kTombStone)
- {
- m_Index.erase(Record.Key);
- return;
- }
- if (!ValidateEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size});
- },
- SkipEntryCount);
- if (InvalidEntryCount)
- {
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath);
- }
- return LogEntryCount;
- }
- }
- return 0;
-}
-
-std::vector<FileCasStrategy::FileCasIndexEntry>
-FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir)
-{
- using namespace filecas::impl;
-
- std::vector<FileCasIndexEntry> Entries;
- struct Visitor : public FileSystemTraversal::TreeVisitor
- {
- Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {}
- virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
- {
- std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory);
-
- std::filesystem::path::string_type PathString = RelPath.native();
-
- if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2)))
- {
- if (PathString.at(3) == std::filesystem::path::preferred_separator)
- {
- PathString.erase(3, 1);
- }
- PathString.append(File);
-
- // TODO: should validate that we're actually dealing with a valid hex string here
-#if ZEN_PLATFORM_WINDOWS
- StringBuilder<64> Utf8;
- WideToUtf8(PathString, Utf8);
- IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()});
-#else
- IoHash NameHash = IoHash::FromHexString(PathString);
-#endif
- Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize});
- }
- }
-
- virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
- [[maybe_unused]] const path_view& DirectoryName) override
- {
- return true;
- }
-
- const std::filesystem::path& RootDirectory;
- std::vector<FileCasIndexEntry>& Entries;
- } CasVisitor{RootDir, Entries};
-
- FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(RootDir, CasVisitor);
- return Entries;
-};
-
- //////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("cas.file.move")
-{
- // specifying an absolute path here can be helpful when using procmon to dig into things
- ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
-
- GcManager Gc;
-
- FileCasStrategy FileCas(Gc);
- FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
-
- {
- std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
-
- IoBuffer ZeroBytes{1024 * 1024};
- IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes);
-
- BasicFile PayloadFile;
- PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate);
- PayloadFile.Write(ZeroBytes, 0);
- PayloadFile.Close();
-
- IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path);
-
- CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash);
- CHECK_EQ(Result.New, true);
- }
-
-# if 0
- SUBCASE("stresstest")
- {
- std::vector<IoHash> PayloadHashes;
-
- const int kWorkers = 64;
- const int kItemCount = 128;
-
- for (int w = 0; w < kWorkers; ++w)
- {
- for (int i = 0; i < kItemCount; ++i)
- {
- IoBuffer Payload{1024};
- *reinterpret_cast<int*>(Payload.MutableData()) = i;
- PayloadHashes.push_back(IoHash::HashBuffer(Payload));
-
- std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)};
- WriteFile(PayloadPath, Payload);
- }
- }
-
- std::barrier Sync{kWorkers};
-
- auto PopulateAll = [&](int w) {
- std::vector<IoBuffer> Buffers;
-
- for (int i = 0; i < kItemCount; ++i)
- {
- std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)};
- IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath);
- Buffers.push_back(Payload);
- Sync.arrive_and_wait();
- CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]);
- }
- };
-
- std::vector<std::jthread> Threads;
-
- for (int i = 0; i < kWorkers; ++i)
- {
- Threads.push_back(std::jthread(PopulateAll, i));
- }
-
- for (std::jthread& Thread : Threads)
- {
- Thread.join();
- }
- }
-# endif
-}
-
-TEST_CASE("cas.file.gc")
-{
- // specifying an absolute path here can be helpful when using procmon to dig into things
- ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
-
- GcManager Gc;
- FileCasStrategy FileCas(Gc);
- FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
-
- const int kIterationCount = 1000;
- std::vector<IoHash> Keys{kIterationCount};
-
- auto InsertChunks = [&] {
- for (int i = 0; i < kIterationCount; ++i)
- {
- CbObjectWriter Cbo;
- Cbo << "id" << i;
- CbObject Obj = Cbo.Save();
-
- IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
- IoHash Hash = HashBuffer(ObjBuffer);
-
- FileCas.InsertChunk(ObjBuffer, Hash);
-
- Keys[i] = Hash;
- }
- };
-
- // Drop everything
-
- {
- InsertChunks();
-
- GcContext Ctx(GcClock::Now() - std::chrono::hours(24));
- FileCas.CollectGarbage(Ctx);
-
- for (const IoHash& Key : Keys)
- {
- IoBuffer Chunk = FileCas.FindChunk(Key);
-
- CHECK(!Chunk);
- }
- }
-
- // Keep roughly half of the chunks
-
- {
- InsertChunks();
-
- GcContext Ctx(GcClock::Now() - std::chrono::hours(24));
-
- for (const IoHash& Key : Keys)
- {
- if (Key.Hash[0] & 1)
- {
- Ctx.AddRetainedCids(std::vector<IoHash>{Key});
- }
- }
-
- FileCas.CollectGarbage(Ctx);
-
- for (const IoHash& Key : Keys)
- {
- if (Key.Hash[0] & 1)
- {
- CHECK(FileCas.FindChunk(Key));
- }
- else
- {
- CHECK(!FileCas.FindChunk(Key));
- }
- }
- }
-}
-
-#endif
-
-void
-filecas_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
deleted file mode 100644
index 420b3a634..000000000
--- a/zenstore/filecas.h
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/zencore.h>
-
-#include <zencore/filesystem.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/thread.h>
-#include <zenstore/caslog.h>
-#include <zenstore/gc.h>
-
-#include "cas.h"
-
-#include <atomic>
-#include <functional>
-
-namespace spdlog {
-class logger;
-}
-
-namespace zen {
-
-class BasicFile;
-
-/** CAS storage strategy using a file-per-chunk storage strategy
- */
-
-struct FileCasStrategy final : public GcStorage
-{
- FileCasStrategy(GcManager& Gc);
- ~FileCasStrategy();
-
- void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore);
- CasStore::InsertResult InsertChunk(IoBuffer Chunk,
- const IoHash& ChunkHash,
- CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace);
- IoBuffer FindChunk(const IoHash& ChunkHash);
- bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(HashKeySet& InOutChunks);
- void Flush();
- void Scrub(ScrubContext& Ctx);
- virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; }
-
-private:
- void MakeIndexSnapshot();
- uint64_t ReadIndexFile();
- uint64_t ReadLog(uint64_t LogPosition);
-
- struct IndexEntry
- {
- uint64_t Size = 0;
- };
- using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>;
-
- CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
-
- std::filesystem::path m_RootDirectory;
- RwLock m_Lock;
- IndexMap m_Index;
- RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
- spdlog::logger& m_Log;
- spdlog::logger& Log() { return m_Log; }
- std::atomic_uint64_t m_TotalSize{};
- bool m_IsInitialized = false;
-
- struct FileCasIndexEntry
- {
- static const uint32_t kTombStone = 0x0000'0001;
-
- bool IsFlagSet(const uint32_t Flag) const { return (Flags & kTombStone) == Flag; }
-
- IoHash Key;
- uint32_t Flags = 0;
- uint64_t Size = 0;
- };
- static bool ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason);
- static std::vector<FileCasStrategy::FileCasIndexEntry> ScanFolderForCasFiles(const std::filesystem::path& RootDir);
-
- static_assert(sizeof(FileCasIndexEntry) == 32);
-
- TCasLogFile<FileCasIndexEntry> m_CasLog;
- uint64_t m_LogFlushPosition = 0;
-
- inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
- void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback);
- void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
-
- struct ShardingHelper
- {
- ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash);
-
- size_t Shard2len = 0;
- ExtendablePathBuilder<128> ShardedPath;
- };
-};
-
-void filecas_forcelink();
-
-} // namespace zen
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
deleted file mode 100644
index 370c3c965..000000000
--- a/zenstore/gc.cpp
+++ /dev/null
@@ -1,1312 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenstore/gc.h>
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
-#include <zencore/timer.h>
-#include <zenstore/cidstore.h>
-
-#include "cas.h"
-
-#include <fmt/format.h>
-#include <filesystem>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <zencore/windows.h>
-#else
-# include <fcntl.h>
-# include <sys/file.h>
-# include <sys/stat.h>
-# include <unistd.h>
-#endif
-
-#if ZEN_WITH_TESTS
-# include <zencore/compress.h>
-# include <algorithm>
-# include <random>
-#endif
-
-template<>
-struct fmt::formatter<zen::GcClock::TimePoint> : formatter<string_view>
-{
- template<typename FormatContext>
- auto format(const zen::GcClock::TimePoint& TimePoint, FormatContext& ctx)
- {
- std::time_t Time = std::chrono::system_clock::to_time_t(TimePoint);
- zen::ExtendableStringBuilder<32> String;
- String << std::ctime(&Time);
- return formatter<string_view>::format(String.ToView(), ctx);
- }
-};
-
-namespace zen {
-
-using namespace std::literals;
-namespace fs = std::filesystem;
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace {
- std::error_code CreateGCReserve(const std::filesystem::path& Path, uint64_t Size)
- {
- if (Size == 0)
- {
- std::filesystem::remove(Path);
- return std::error_code{};
- }
- CreateDirectories(Path.parent_path());
- if (std::filesystem::is_regular_file(Path) && std::filesystem::file_size(Path) == Size)
- {
- return std::error_code();
- }
-#if ZEN_PLATFORM_WINDOWS
- DWORD dwCreationDisposition = CREATE_ALWAYS;
- DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
-
- const DWORD dwShareMode = 0;
- const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
- HANDLE hTemplateFile = nullptr;
-
- HANDLE FileHandle = CreateFile(Path.c_str(),
- dwDesiredAccess,
- dwShareMode,
- /* lpSecurityAttributes */ nullptr,
- dwCreationDisposition,
- dwFlagsAndAttributes,
- hTemplateFile);
-
- if (FileHandle == INVALID_HANDLE_VALUE)
- {
- return MakeErrorCodeFromLastError();
- }
- bool Keep = true;
- auto _ = MakeGuard([&]() {
- ::CloseHandle(FileHandle);
- if (!Keep)
- {
- ::DeleteFile(Path.c_str());
- }
- });
- LARGE_INTEGER liFileSize;
- liFileSize.QuadPart = Size;
- BOOL OK = ::SetFilePointerEx(FileHandle, liFileSize, 0, FILE_BEGIN);
- if (!OK)
- {
- return MakeErrorCodeFromLastError();
- }
- OK = ::SetEndOfFile(FileHandle);
- if (!OK)
- {
- return MakeErrorCodeFromLastError();
- }
- Keep = true;
-#else
- int OpenFlags = O_CLOEXEC | O_RDWR | O_CREAT;
- int Fd = open(Path.c_str(), OpenFlags, 0666);
- if (Fd < 0)
- {
- return MakeErrorCodeFromLastError();
- }
-
- bool Keep = true;
- auto _ = MakeGuard([&]() {
- close(Fd);
- if (!Keep)
- {
- unlink(Path.c_str());
- }
- });
-
- if (fchmod(Fd, 0666) < 0)
- {
- return MakeErrorCodeFromLastError();
- }
-
-# if ZEN_PLATFORM_MAC
- if (ftruncate(Fd, (off_t)Size) < 0)
- {
- return MakeErrorCodeFromLastError();
- }
-# else
- if (ftruncate64(Fd, (off64_t)Size) < 0)
- {
- return MakeErrorCodeFromLastError();
- }
- int Error = posix_fallocate64(Fd, 0, (off64_t)Size);
- if (Error)
- {
- return MakeErrorCode(Error);
- }
-# endif
- Keep = true;
-#endif
- return std::error_code{};
- }
-
-} // namespace
-
-//////////////////////////////////////////////////////////////////////////
-
-CbObject
-LoadCompactBinaryObject(const fs::path& Path)
-{
- FileContents Result = ReadFile(Path);
-
- if (!Result.ErrorCode)
- {
- IoBuffer Buffer = Result.Flatten();
- if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
- {
- return LoadCompactBinaryObject(Buffer);
- }
- }
-
- return CbObject();
-}
-
-void
-SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
-{
- WriteFile(Path, Object.GetBuffer().AsIoBuffer());
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-struct GcContext::GcState
-{
- using CacheKeyContexts = std::unordered_map<std::string, std::vector<IoHash>>;
-
- CacheKeyContexts m_ExpiredCacheKeys;
- HashKeySet m_RetainedCids;
- HashKeySet m_DeletedCids;
- GcClock::TimePoint m_ExpireTime;
- bool m_DeletionMode = true;
- bool m_CollectSmallObjects = false;
-
- std::filesystem::path DiskReservePath;
-};
-
-GcContext::GcContext(const GcClock::TimePoint& ExpireTime) : m_State(std::make_unique<GcState>())
-{
- m_State->m_ExpireTime = ExpireTime;
-}
-
-GcContext::~GcContext()
-{
-}
-
-void
-GcContext::AddRetainedCids(std::span<const IoHash> Cids)
-{
- m_State->m_RetainedCids.AddHashesToSet(Cids);
-}
-
-void
-GcContext::SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys)
-{
- m_State->m_ExpiredCacheKeys[CacheKeyContext] = std::move(ExpiredKeys);
-}
-
-void
-GcContext::IterateCids(std::function<void(const IoHash&)> Callback)
-{
- m_State->m_RetainedCids.IterateHashes([&](const IoHash& Hash) { Callback(Hash); });
-}
-
-void
-GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc)
-{
- m_State->m_RetainedCids.FilterHashes(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
-}
-
-void
-GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc)
-{
- m_State->m_RetainedCids.FilterHashes(Cid, std::move(FilterFunc));
-}
-
-void
-GcContext::AddDeletedCids(std::span<const IoHash> Cas)
-{
- m_State->m_DeletedCids.AddHashesToSet(Cas);
-}
-
-const HashKeySet&
-GcContext::DeletedCids()
-{
- return m_State->m_DeletedCids;
-}
-
-std::span<const IoHash>
-GcContext::ExpiredCacheKeys(const std::string& CacheKeyContext) const
-{
- return m_State->m_ExpiredCacheKeys[CacheKeyContext];
-}
-
-bool
-GcContext::IsDeletionMode() const
-{
- return m_State->m_DeletionMode;
-}
-
-void
-GcContext::SetDeletionMode(bool NewState)
-{
- m_State->m_DeletionMode = NewState;
-}
-
-bool
-GcContext::CollectSmallObjects() const
-{
- return m_State->m_CollectSmallObjects;
-}
-
-void
-GcContext::CollectSmallObjects(bool NewState)
-{
- m_State->m_CollectSmallObjects = NewState;
-}
-
-GcClock::TimePoint
-GcContext::ExpireTime() const
-{
- return m_State->m_ExpireTime;
-}
-
-void
-GcContext::DiskReservePath(const std::filesystem::path& Path)
-{
- m_State->DiskReservePath = Path;
-}
-
-uint64_t
-GcContext::ClaimGCReserve()
-{
- if (!std::filesystem::is_regular_file(m_State->DiskReservePath))
- {
- return 0;
- }
- uint64_t ReclaimedSize = std::filesystem::file_size(m_State->DiskReservePath);
- if (std::filesystem::remove(m_State->DiskReservePath))
- {
- return ReclaimedSize;
- }
- return 0;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-GcContributor::GcContributor(GcManager& Gc) : m_Gc(Gc)
-{
- m_Gc.AddGcContributor(this);
-}
-
-GcContributor::~GcContributor()
-{
- m_Gc.RemoveGcContributor(this);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-GcStorage::GcStorage(GcManager& Gc) : m_Gc(Gc)
-{
- m_Gc.AddGcStorage(this);
-}
-
-GcStorage::~GcStorage()
-{
- m_Gc.RemoveGcStorage(this);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-GcManager::GcManager() : m_Log(logging::Get("gc"))
-{
-}
-
-GcManager::~GcManager()
-{
-}
-
-void
-GcManager::AddGcContributor(GcContributor* Contributor)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_GcContribs.push_back(Contributor);
-}
-
-void
-GcManager::RemoveGcContributor(GcContributor* Contributor)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; });
-}
-
-void
-GcManager::AddGcStorage(GcStorage* Storage)
-{
- ZEN_ASSERT(Storage != nullptr);
- RwLock::ExclusiveLockScope _(m_Lock);
- m_GcStorage.push_back(Storage);
-}
-
-void
-GcManager::RemoveGcStorage(GcStorage* Storage)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; });
-}
-
-void
-GcManager::CollectGarbage(GcContext& GcCtx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- // First gather reference set
- {
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- for (GcContributor* Contributor : m_GcContribs)
- {
- Contributor->GatherReferences(GcCtx);
- }
- }
-
- // Then trim storage
- {
- GcStorageSize GCTotalSizeDiff;
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- ZEN_INFO("collected garbage in {}. Removed {} disk space, {} memory",
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- NiceBytes(GCTotalSizeDiff.DiskSize),
- NiceBytes(GCTotalSizeDiff.MemorySize));
- });
- for (GcStorage* Storage : m_GcStorage)
- {
- const auto PreSize = Storage->StorageSize();
- Storage->CollectGarbage(GcCtx);
- const auto PostSize = Storage->StorageSize();
- GCTotalSizeDiff.DiskSize += PreSize.DiskSize > PostSize.DiskSize ? PreSize.DiskSize - PostSize.DiskSize : 0;
- GCTotalSizeDiff.MemorySize += PreSize.MemorySize > PostSize.MemorySize ? PreSize.MemorySize - PostSize.MemorySize : 0;
- }
- }
-}
-
-GcStorageSize
-GcManager::TotalStorageSize() const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- GcStorageSize TotalSize;
-
- for (GcStorage* Storage : m_GcStorage)
- {
- const auto Size = Storage->StorageSize();
- TotalSize.DiskSize += Size.DiskSize;
- TotalSize.MemorySize += Size.MemorySize;
- }
-
- return TotalSize;
-}
-
-#if ZEN_USE_REF_TRACKING
-void
-GcManager::OnNewCidReferences(std::span<IoHash> Hashes)
-{
- ZEN_UNUSED(Hashes);
-}
-
-void
-GcManager::OnCommittedCidReferences(std::span<IoHash> Hashes)
-{
- ZEN_UNUSED(Hashes);
-}
-
-void
-GcManager::OnDroppedCidReferences(std::span<IoHash> Hashes)
-{
- ZEN_UNUSED(Hashes);
-}
-#endif
-
-//////////////////////////////////////////////////////////////////////////
-void
-DiskUsageWindow::KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick)
-{
- auto It = m_LogWindow.begin();
- if (It == m_LogWindow.end())
- {
- return;
- }
- while (It->SampleTime < StartTick)
- {
- ++It;
- if (It == m_LogWindow.end())
- {
- m_LogWindow.clear();
- return;
- }
- }
- m_LogWindow.erase(m_LogWindow.begin(), It);
-
- It = m_LogWindow.begin();
- while (It != m_LogWindow.end())
- {
- if (It->SampleTime >= EndTick)
- {
- m_LogWindow.erase(It, m_LogWindow.end());
- return;
- }
- It++;
- }
-}
-
-std::vector<uint64_t>
-DiskUsageWindow::GetDiskDeltas(GcClock::Tick StartTick, GcClock::Tick EndTick, GcClock::Tick DeltaWidth, uint64_t& OutMaxDelta) const
-{
- ZEN_ASSERT(StartTick != -1);
- ZEN_ASSERT(DeltaWidth > 0);
-
- std::vector<uint64_t> Result;
- Result.reserve((EndTick - StartTick + DeltaWidth - 1) / DeltaWidth);
-
- size_t WindowSize = m_LogWindow.size();
- GcClock::Tick FirstWindowTick = WindowSize < 2 ? EndTick : m_LogWindow[1].SampleTime;
-
- GcClock::Tick RangeStart = StartTick;
- while (FirstWindowTick >= RangeStart + DeltaWidth && RangeStart < EndTick)
- {
- Result.push_back(0);
- RangeStart += DeltaWidth;
- }
-
- uint64_t DeltaSum = 0;
- size_t WindowIndex = 1;
- while (WindowIndex < WindowSize && RangeStart < EndTick)
- {
- const DiskUsageEntry& Entry = m_LogWindow[WindowIndex];
- if (Entry.SampleTime < RangeStart)
- {
- ++WindowIndex;
- continue;
- }
- GcClock::Tick RangeEnd = Min(EndTick, RangeStart + DeltaWidth);
- ZEN_ASSERT(Entry.SampleTime >= RangeStart);
- if (Entry.SampleTime >= RangeEnd)
- {
- Result.push_back(DeltaSum);
- OutMaxDelta = Max(DeltaSum, OutMaxDelta);
- DeltaSum = 0;
- RangeStart = RangeEnd;
- continue;
- }
- const DiskUsageEntry& PrevEntry = m_LogWindow[WindowIndex - 1];
- if (Entry.DiskUsage > PrevEntry.DiskUsage)
- {
- uint64_t Delta = Entry.DiskUsage - PrevEntry.DiskUsage;
- DeltaSum += Delta;
- }
- WindowIndex++;
- }
-
- while (RangeStart < EndTick)
- {
- Result.push_back(DeltaSum);
- OutMaxDelta = Max(DeltaSum, OutMaxDelta);
- DeltaSum = 0;
- RangeStart += DeltaWidth;
- }
- return Result;
-}
-
-GcClock::Tick
-DiskUsageWindow::FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const
-{
- ZEN_ASSERT(Amount > 0);
- uint64_t RemainingToFind = Amount;
- size_t Offset = 1;
- while (Offset < m_LogWindow.size())
- {
- const DiskUsageEntry& Entry = m_LogWindow[Offset];
- if (Entry.SampleTime >= EndTick)
- {
- return EndTick;
- }
- const DiskUsageEntry& PreviousEntry = m_LogWindow[Offset - 1];
- uint64_t Delta = Entry.DiskUsage > PreviousEntry.DiskUsage ? Entry.DiskUsage - PreviousEntry.DiskUsage : 0;
- if (Delta >= RemainingToFind)
- {
- return m_LogWindow[Offset].SampleTime + 1;
- }
- RemainingToFind -= Delta;
- Offset++;
- }
- return EndTick;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager)
-{
-}
-
-GcScheduler::~GcScheduler()
-{
- Shutdown();
-}
-
-void
-GcScheduler::Initialize(const GcSchedulerConfig& Config)
-{
- using namespace std::chrono;
-
- m_Config = Config;
-
- if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval)
- {
- m_Config.Interval = m_Config.MonitorInterval;
- }
-
- std::filesystem::create_directories(Config.RootDirectory);
-
- std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
- if (Ec)
- {
- ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'",
- m_Config.RootDirectory / "reserve.gc",
- NiceBytes(m_Config.DiskReserveSize),
- Ec.message());
- }
-
- m_LastGcTime = GcClock::Now();
- m_LastGcExpireTime = GcClock::TimePoint::min();
-
- if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state"))
- {
- m_LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64()));
- m_LastGcExpireTime =
- GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcExpireTime"].AsInt64(GcClock::Duration::min().count())));
- if (m_LastGcTime + m_Config.Interval < GcClock::Now())
- {
- // TODO: Trigger GC?
- m_LastGcTime = GcClock::Now();
- }
- }
-
- m_DiskUsageLog.Open(m_Config.RootDirectory / "gc.dlog", CasLogFile::Mode::kWrite);
- m_DiskUsageLog.Initialize();
- const GcClock::Tick LastGCTick = m_LastGcTime.time_since_epoch().count();
- m_DiskUsageLog.Replay(
- [this, LastGCTick](const DiskUsageWindow::DiskUsageEntry& Entry) {
- if (Entry.SampleTime >= m_LastGcExpireTime.time_since_epoch().count())
- {
- m_DiskUsageWindow.Append(Entry);
- }
- },
- 0);
-
- m_NextGcTime = NextGcTime(m_LastGcTime);
- m_GcThread = std::thread(&GcScheduler::SchedulerThread, this);
-}
-
-void
-GcScheduler::Shutdown()
-{
- if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
- {
- bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning);
- m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
- m_GcSignal.notify_one();
-
- if (m_GcThread.joinable())
- {
- if (GcIsRunning)
- {
- ZEN_INFO("Waiting for garbage collection to complete");
- }
- m_GcThread.join();
- }
- }
- m_DiskUsageLog.Flush();
- m_DiskUsageLog.Close();
-}
-
-bool
-GcScheduler::Trigger(const GcScheduler::TriggerParams& Params)
-{
- if (m_Config.Enabled)
- {
- std::unique_lock Lock(m_GcMutex);
- if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
- {
- m_TriggerParams = Params;
- uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
- if (m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
- {
- m_GcSignal.notify_one();
- return true;
- }
- }
- }
-
- return false;
-}
-
-void
-GcScheduler::SchedulerThread()
-{
- std::chrono::seconds WaitTime{0};
-
- for (;;)
- {
- bool Timeout = false;
- {
- ZEN_ASSERT(WaitTime.count() >= 0);
- std::unique_lock Lock(m_GcMutex);
- Timeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, WaitTime);
- }
-
- if (Status() == GcSchedulerStatus::kStopped)
- {
- break;
- }
-
- if (!m_Config.Enabled)
- {
- WaitTime = std::chrono::seconds::max();
- continue;
- }
-
- if (!Timeout && Status() == GcSchedulerStatus::kIdle)
- {
- continue;
- }
-
- bool Delete = true;
- bool CollectSmallObjects = m_Config.CollectSmallObjects;
- std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration;
- uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit;
- GcClock::TimePoint Now = GcClock::Now();
- if (m_TriggerParams)
- {
- const auto TriggerParams = m_TriggerParams.value();
- m_TriggerParams.reset();
-
- CollectSmallObjects = TriggerParams.CollectSmallObjects;
- if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max())
- {
- MaxCacheDuration = TriggerParams.MaxCacheDuration;
- }
- if (TriggerParams.DiskSizeSoftLimit != 0)
- {
- DiskSizeSoftLimit = TriggerParams.DiskSizeSoftLimit;
- }
- }
-
- GcClock::TimePoint ExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration;
-
- std::error_code Ec;
- const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
-
- if (Timeout && Status() == GcSchedulerStatus::kIdle)
- {
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
- if (Ec)
- {
- ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message());
- }
-
- const int64_t PressureGraphLength = 30;
- const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval;
- std::vector<uint64_t> DiskDeltas;
- uint64_t MaxLoad = 0;
- {
- const GcClock::Tick EpochTickCount = GcClock::Now().time_since_epoch().count();
- std::unique_lock Lock(m_GcMutex);
- m_DiskUsageWindow.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize});
- m_DiskUsageLog.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize});
- const GcClock::TimePoint LoadGraphStartTime = Now - LoadGraphTime;
- GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count();
- GcClock::Tick End = Now.time_since_epoch().count();
- DiskDeltas = m_DiskUsageWindow.GetDiskDeltas(Start,
- End,
- Max(1, (End - Start + PressureGraphLength - 1) / PressureGraphLength),
- MaxLoad);
- }
-
- std::string LoadGraph;
- LoadGraph.resize(DiskDeltas.size(), '0');
- if (DiskDeltas.size() > 0 && MaxLoad > 0)
- {
- char LoadIndicator[11] = "0123456789";
- for (size_t Index = 0; Index < DiskDeltas.size(); ++Index)
- {
- size_t LoadIndex = (9 * DiskDeltas[Index] + MaxLoad - 1) / MaxLoad;
- LoadGraph[Index] = LoadIndicator[LoadIndex];
- }
- }
-
- uint64_t GcDiskSpaceGoal = 0;
- if (DiskSizeSoftLimit != 0 && TotalSize.DiskSize > DiskSizeSoftLimit)
- {
- GcDiskSpaceGoal = TotalSize.DiskSize - DiskSizeSoftLimit;
- std::unique_lock Lock(m_GcMutex);
- GcClock::Tick AgeTick = m_DiskUsageWindow.FindTimepointThatRemoves(GcDiskSpaceGoal, Now.time_since_epoch().count());
- GcClock::TimePoint SizeBasedExpireTime = GcClock::TimePointFromTick(AgeTick);
- if (SizeBasedExpireTime > ExpireTime)
- {
- ExpireTime = SizeBasedExpireTime;
- }
- }
-
- bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0;
-
- std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now());
-
- if (RemaingTime < std::chrono::seconds::zero())
- {
- RemaingTime = std::chrono::seconds::zero();
- }
-
- bool TimeBasedGCTriggered = !DiskSpaceGCTriggered && RemaingTime.count() == 0;
- ZEN_INFO(
- "{} in use,{} {} of total {} free disk space, disk writes last {} per {} [{}], peak {}/s. {}",
- NiceBytes(TotalSize.DiskSize),
- DiskSizeSoftLimit == 0 ? "" : fmt::format(" {} soft limit,", NiceBytes(DiskSizeSoftLimit)),
- NiceBytes(Space.Free),
- NiceBytes(Space.Total),
- NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count())),
- NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count() / PressureGraphLength)),
- LoadGraph,
- NiceBytes(MaxLoad * uint64_t(std::chrono::seconds(1).count()) / uint64_t(std::chrono::seconds(LoadGraphTime).count())),
- DiskSpaceGCTriggered ? fmt::format("Disk use threshold triggered, trying to reclaim {}. ", NiceBytes(GcDiskSpaceGoal))
- : TimeBasedGCTriggered ? "GC schedule triggered."
- : m_NextGcTime == GcClock::TimePoint::max()
- ? ""
- : fmt::format("{} until next scheduled GC.", NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTime).count()))));
-
- if (!DiskSpaceGCTriggered && !TimeBasedGCTriggered)
- {
- WaitTime = m_Config.MonitorInterval < RemaingTime ? m_Config.MonitorInterval : RemaingTime;
- continue;
- }
-
- WaitTime = m_Config.MonitorInterval;
- uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
- if (!m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
- {
- continue;
- }
- }
-
- CollectGarbage(ExpireTime, Delete, CollectSmallObjects);
-
- uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
- if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
- {
- ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped));
- break;
- }
-
- WaitTime = m_Config.MonitorInterval;
- }
-}
-
-GcClock::TimePoint
-GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime)
-{
- if (m_Config.Interval.count())
- {
- return CurrentTime + m_Config.Interval;
- }
- else
- {
- return GcClock::TimePoint::max();
- }
-}
-
-void
-GcScheduler::CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects)
-{
- GcContext GcCtx(ExpireTime);
- GcCtx.SetDeletionMode(Delete);
- GcCtx.CollectSmallObjects(CollectSmallObjects);
- // GcCtx.MaxCacheDuration(MaxCacheDuration);
- GcCtx.DiskReservePath(m_Config.RootDirectory / "reserve.gc");
-
- ZEN_INFO("garbage collection STARTING, small objects gc {}, cutoff time {}",
- GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv,
- ExpireTime);
- {
- Stopwatch Timer;
- const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- m_GcManager.CollectGarbage(GcCtx);
-
- if (Delete)
- {
- m_LastGcExpireTime = ExpireTime;
- std::unique_lock Lock(m_GcMutex);
- m_DiskUsageWindow.KeepRange(ExpireTime.time_since_epoch().count(), GcClock::Duration::max().count());
- }
-
- m_LastGcTime = GcClock::Now();
- m_NextGcTime = NextGcTime(m_LastGcTime);
-
- {
- const fs::path Path = m_Config.RootDirectory / "gc_state";
- ZEN_DEBUG("saving scheduler state to '{}'", Path);
- CbObjectWriter SchedulerState;
- SchedulerState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count());
- SchedulerState << "LastGcExpireTime"sv << static_cast<int64_t>(m_LastGcExpireTime.time_since_epoch().count());
- SaveCompactBinaryObject(Path, SchedulerState.Save());
- }
-
- std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
- if (Ec)
- {
- ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason: '{}'",
- m_Config.RootDirectory / "reserve.gc",
- NiceBytes(m_Config.DiskReserveSize),
- Ec.message());
- }
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-
-namespace gc::impl {
- static IoBuffer CreateChunk(uint64_t Size)
- {
- static std::random_device rd;
- static std::mt19937 g(rd());
-
- std::vector<uint8_t> Values;
- Values.resize(Size);
- for (size_t Idx = 0; Idx < Size; ++Idx)
- {
- Values[Idx] = static_cast<uint8_t>(Idx);
- }
- std::shuffle(Values.begin(), Values.end(), g);
-
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
- }
-
- static CompressedBuffer Compress(IoBuffer Buffer)
- {
- return CompressedBuffer::Compress(SharedBuffer::MakeView(Buffer.GetData(), Buffer.GetSize()));
- }
-} // namespace gc::impl
-
-TEST_CASE("gc.basic")
-{
- using namespace gc::impl;
-
- ScopedTemporaryDirectory TempDir;
-
- CidStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path() / "cas";
-
- GcManager Gc;
- CidStore CidStore(Gc);
-
- CidStore.Initialize(CasConfig);
-
- IoBuffer Chunk = CreateChunk(128);
- auto CompressedChunk = Compress(Chunk);
-
- const auto InsertResult = CidStore.AddChunk(CompressedChunk.GetCompressed().Flatten().AsIoBuffer(), CompressedChunk.DecodeRawHash());
- CHECK(InsertResult.New);
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
-
- CidStore.Flush();
- Gc.CollectGarbage(GcCtx);
-
- CHECK(!CidStore.ContainsChunk(CompressedChunk.DecodeRawHash()));
-}
-
-TEST_CASE("gc.full")
-{
- using namespace gc::impl;
-
- ScopedTemporaryDirectory TempDir;
-
- CidStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path() / "cas";
-
- GcManager Gc;
- std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc);
-
- CasStore->Initialize(CasConfig);
-
- uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
- IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]),
- CreateChunk(ChunkSizes[1]),
- CreateChunk(ChunkSizes[2]),
- CreateChunk(ChunkSizes[3]),
- CreateChunk(ChunkSizes[4]),
- CreateChunk(ChunkSizes[5]),
- CreateChunk(ChunkSizes[6]),
- CreateChunk(ChunkSizes[7]),
- CreateChunk(ChunkSizes[8])};
- IoHash ChunkHashes[9] = {
- IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()),
- IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()),
- IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()),
- IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()),
- IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()),
- IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()),
- IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()),
- IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()),
- IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()),
- };
-
- CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
- CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
- CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
- CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
- CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
- CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
- CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
- CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
- CasStore->InsertChunk(Chunks[8], ChunkHashes[8]);
-
- CidStoreSize InitialSize = CasStore->TotalSize();
-
- // Keep first and last
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
-
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[0]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- CasStore->Flush();
- Gc.CollectGarbage(GcCtx);
-
- CHECK(CasStore->ContainsChunk(ChunkHashes[0]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[7]));
- CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
- }
-
- CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
- CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
- CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
- CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
- CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
- CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
- CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
-
- // Keep last
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- CasStore->Flush();
- Gc.CollectGarbage(GcCtx);
-
- CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[7]));
- CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
-
- CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
- CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
- CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
- CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
- CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
- CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
- CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Keep mixed
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[1]);
- KeepChunks.push_back(ChunkHashes[4]);
- KeepChunks.push_back(ChunkHashes[7]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- CasStore->Flush();
- Gc.CollectGarbage(GcCtx);
-
- CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
- CHECK(CasStore->ContainsChunk(ChunkHashes[1]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
- CHECK(CasStore->ContainsChunk(ChunkHashes[4]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
- CHECK(CasStore->ContainsChunk(ChunkHashes[7]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
-
- CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
- CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
- CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
- CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
- CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
- CasStore->InsertChunk(Chunks[8], ChunkHashes[8]);
- }
-
- // Keep multiple at end
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[6]);
- KeepChunks.push_back(ChunkHashes[7]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- CasStore->Flush();
- Gc.CollectGarbage(GcCtx);
-
- CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
- CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
- CHECK(CasStore->ContainsChunk(ChunkHashes[6]));
- CHECK(CasStore->ContainsChunk(ChunkHashes[7]));
- CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
-
- CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
- CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
- CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
- CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
- CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
- CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
- }
-
- // Verify that we nicely appended blocks even after all GC operations
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[3] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[3])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[5] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[5])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
-
- auto FinalSize = CasStore->TotalSize();
-
- CHECK_LE(InitialSize.TinySize, FinalSize.TinySize);
- CHECK_GE(InitialSize.TinySize + (1u << 28), FinalSize.TinySize);
-}
-
-TEST_CASE("gc.diskusagewindow")
-{
- using namespace gc::impl;
-
- DiskUsageWindow Stats;
- Stats.Append({.SampleTime = 0, .DiskUsage = 0}); // 0 0
- Stats.Append({.SampleTime = 10, .DiskUsage = 10}); // 1 10
- Stats.Append({.SampleTime = 20, .DiskUsage = 20}); // 2 10
- Stats.Append({.SampleTime = 30, .DiskUsage = 20}); // 3 0
- Stats.Append({.SampleTime = 40, .DiskUsage = 15}); // 4 0
- Stats.Append({.SampleTime = 50, .DiskUsage = 25}); // 5 10
- Stats.Append({.SampleTime = 60, .DiskUsage = 30}); // 6 5
- Stats.Append({.SampleTime = 70, .DiskUsage = 45}); // 7 15
-
- SUBCASE("Truncate start")
- {
- Stats.KeepRange(-15, 31);
- CHECK(Stats.m_LogWindow.size() == 4);
- CHECK(Stats.m_LogWindow[0].SampleTime == 0);
- CHECK(Stats.m_LogWindow[3].SampleTime == 30);
- }
-
- SUBCASE("Truncate end")
- {
- Stats.KeepRange(70, 71);
- CHECK(Stats.m_LogWindow.size() == 1);
- CHECK(Stats.m_LogWindow[0].SampleTime == 70);
- }
-
- SUBCASE("Truncate middle")
- {
- Stats.KeepRange(29, 69);
- CHECK(Stats.m_LogWindow.size() == 4);
- CHECK(Stats.m_LogWindow[0].SampleTime == 30);
- CHECK(Stats.m_LogWindow[3].SampleTime == 60);
- }
-
- SUBCASE("Full range")
- {
- uint64_t MaxDelta = 0;
- // 0-10, 10-20, 20-30, 30-40, 40-50, 50-60, 60-70, 70-80
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(0, 80, 10, MaxDelta);
- CHECK(DiskDeltas.size() == 8);
- CHECK(MaxDelta == 15);
- CHECK(DiskDeltas[0] == 0);
- CHECK(DiskDeltas[1] == 10);
- CHECK(DiskDeltas[2] == 10);
- CHECK(DiskDeltas[3] == 0);
- CHECK(DiskDeltas[4] == 0);
- CHECK(DiskDeltas[5] == 10);
- CHECK(DiskDeltas[6] == 5);
- CHECK(DiskDeltas[7] == 15);
- }
-
- SUBCASE("Sub range")
- {
- uint64_t MaxDelta = 0;
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(20, 40, 10, MaxDelta);
- CHECK(DiskDeltas.size() == 2);
- CHECK(MaxDelta == 10);
- CHECK(DiskDeltas[0] == 10); // [20:30]
- CHECK(DiskDeltas[1] == 0); // [30:40]
- }
- SUBCASE("Unaligned sub range 1")
- {
- uint64_t MaxDelta = 0;
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(21, 51, 10, MaxDelta);
- CHECK(DiskDeltas.size() == 3);
- CHECK(MaxDelta == 10);
- CHECK(DiskDeltas[0] == 0); // [21:31]
- CHECK(DiskDeltas[1] == 0); // [31:41]
- CHECK(DiskDeltas[2] == 10); // [41:51]
- }
- SUBCASE("Unaligned end range")
- {
- uint64_t MaxDelta = 0;
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(29, 79, 10, MaxDelta);
- CHECK(DiskDeltas.size() == 5);
- CHECK(MaxDelta == 15);
- CHECK(DiskDeltas[0] == 0); // [29:39]
- CHECK(DiskDeltas[1] == 0); // [39:49]
- CHECK(DiskDeltas[2] == 10); // [49:59]
- CHECK(DiskDeltas[3] == 5); // [59:69]
- CHECK(DiskDeltas[4] == 15); // [69:79]
- }
- SUBCASE("Ahead of window")
- {
- uint64_t MaxDelta = 0;
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(-40, 0, 10, MaxDelta);
- CHECK(DiskDeltas.size() == 4);
- CHECK(MaxDelta == 0);
- CHECK(DiskDeltas[0] == 0); // [-40:-30]
- CHECK(DiskDeltas[1] == 0); // [-30:-20]
- CHECK(DiskDeltas[2] == 0); // [-20:-10]
- CHECK(DiskDeltas[3] == 0); // [-10:0]
- }
- SUBCASE("After of window")
- {
- uint64_t MaxDelta = 0;
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(90, 120, 10, MaxDelta);
- CHECK(DiskDeltas.size() == 3);
- CHECK(MaxDelta == 0);
- CHECK(DiskDeltas[0] == 0); // [90:100]
- CHECK(DiskDeltas[1] == 0); // [100:110]
- CHECK(DiskDeltas[2] == 0); // [110:120]
- }
- SUBCASE("Encapsulating window")
- {
- uint64_t MaxDelta = 0;
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(-20, 100, 10, MaxDelta);
- CHECK(DiskDeltas.size() == 12);
- CHECK(MaxDelta == 15);
- CHECK(DiskDeltas[0] == 0); // [-20:-10]
- CHECK(DiskDeltas[1] == 0); // [ -10:0]
- CHECK(DiskDeltas[2] == 0); // [0:10]
- CHECK(DiskDeltas[3] == 10); // [10:20]
- CHECK(DiskDeltas[4] == 10); // [20:30]
- CHECK(DiskDeltas[5] == 0); // [30:40]
- CHECK(DiskDeltas[6] == 0); // [40:50]
- CHECK(DiskDeltas[7] == 10); // [50:60]
- CHECK(DiskDeltas[8] == 5); // [60:70]
- CHECK(DiskDeltas[9] == 15); // [70:80]
- CHECK(DiskDeltas[10] == 0); // [80:90]
- CHECK(DiskDeltas[11] == 0); // [90:100]
- }
-
- SUBCASE("Full range half stride")
- {
- uint64_t MaxDelta = 0;
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(0, 80, 20, MaxDelta);
- CHECK(DiskDeltas.size() == 4);
- CHECK(MaxDelta == 20);
- CHECK(DiskDeltas[0] == 10); // [0:20]
- CHECK(DiskDeltas[1] == 10); // [20:40]
- CHECK(DiskDeltas[2] == 10); // [40:60]
- CHECK(DiskDeltas[3] == 20); // [60:80]
- }
-
- SUBCASE("Partial odd stride")
- {
- uint64_t MaxDelta = 0;
- std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(13, 67, 18, MaxDelta);
- CHECK(DiskDeltas.size() == 3);
- CHECK(MaxDelta == 15);
- CHECK(DiskDeltas[0] == 10); // [13:31]
- CHECK(DiskDeltas[1] == 0); // [31:49]
- CHECK(DiskDeltas[2] == 15); // [49:67]
- }
-
- SUBCASE("Find size window")
- {
- DiskUsageWindow Empty;
- CHECK(Empty.FindTimepointThatRemoves(15u, 10000) == 10000);
-
- CHECK(Stats.FindTimepointThatRemoves(15u, 40) == 21);
- CHECK(Stats.FindTimepointThatRemoves(15u, 20) == 20);
- CHECK(Stats.FindTimepointThatRemoves(100000u, 50) == 50);
- CHECK(Stats.FindTimepointThatRemoves(100000u, 1000));
- }
-}
-#endif
-
-void
-gc_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/zenstore/hashkeyset.cpp b/zenstore/hashkeyset.cpp
deleted file mode 100644
index a5436f5cb..000000000
--- a/zenstore/hashkeyset.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenstore/hashkeyset.h>
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-void
-HashKeySet::AddHashToSet(const IoHash& HashToAdd)
-{
- m_HashSet.insert(HashToAdd);
-}
-
-void
-HashKeySet::AddHashesToSet(std::span<const IoHash> HashesToAdd)
-{
- m_HashSet.insert(HashesToAdd.begin(), HashesToAdd.end());
-}
-
-void
-HashKeySet::RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
-{
- for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd;)
- {
- if (Predicate(*It))
- {
- It = m_HashSet.erase(It);
- }
- else
- {
- ++It;
- }
- }
-}
-
-void
-HashKeySet::IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const
-{
- for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd; ++It)
- {
- Callback(*It);
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-//
-// Testing related code follows...
-//
-
-#if ZEN_WITH_TESTS
-
-void
-hashkeyset_forcelink()
-{
-}
-
-#endif
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
deleted file mode 100644
index 857ccae38..000000000
--- a/zenstore/include/zenstore/blockstore.h
+++ /dev/null
@@ -1,175 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/filesystem.h>
-#include <zencore/zencore.h>
-#include <zenutil/basicfile.h>
-
-#include <unordered_map>
-#include <unordered_set>
-
-namespace zen {
-
-//////////////////////////////////////////////////////////////////////////
-
-struct BlockStoreLocation
-{
- uint32_t BlockIndex;
- uint64_t Offset;
- uint64_t Size;
-
- inline auto operator<=>(const BlockStoreLocation& Rhs) const = default;
-};
-
-#pragma pack(push)
-#pragma pack(1)
-
-struct BlockStoreDiskLocation
-{
- constexpr static uint32_t MaxBlockIndexBits = 20;
- constexpr static uint32_t MaxOffsetBits = 28;
- constexpr static uint32_t MaxBlockIndex = (1ul << BlockStoreDiskLocation::MaxBlockIndexBits) - 1ul;
- constexpr static uint32_t MaxOffset = (1ul << BlockStoreDiskLocation::MaxOffsetBits) - 1ul;
-
- BlockStoreDiskLocation(const BlockStoreLocation& Location, uint64_t OffsetAlignment)
- {
- Init(Location.BlockIndex, Location.Offset / OffsetAlignment, Location.Size);
- }
-
- BlockStoreDiskLocation() = default;
-
- inline BlockStoreLocation Get(uint64_t OffsetAlignment) const
- {
- uint64_t PackedOffset = 0;
- memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
- return {.BlockIndex = static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits),
- .Offset = (PackedOffset & MaxOffset) * OffsetAlignment,
- .Size = GetSize()};
- }
-
- inline uint32_t GetBlockIndex() const
- {
- uint64_t PackedOffset = 0;
- memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
- return static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits);
- }
-
- inline uint64_t GetOffset(uint64_t OffsetAlignment) const
- {
- uint64_t PackedOffset = 0;
- memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
- return (PackedOffset & MaxOffset) * OffsetAlignment;
- }
-
- inline uint64_t GetSize() const { return m_Size; }
-
- inline auto operator<=>(const BlockStoreDiskLocation& Rhs) const = default;
-
-private:
- inline void Init(uint32_t BlockIndex, uint64_t Offset, uint64_t Size)
- {
- ZEN_ASSERT(BlockIndex <= MaxBlockIndex);
- ZEN_ASSERT(Offset <= MaxOffset);
- ZEN_ASSERT(Size <= std::numeric_limits<std::uint32_t>::max());
-
- m_Size = static_cast<uint32_t>(Size);
- uint64_t PackedOffset = (static_cast<uint64_t>(BlockIndex) << MaxOffsetBits) + Offset;
- memcpy(&m_Offset[0], &PackedOffset, sizeof m_Offset);
- }
-
- uint32_t m_Size;
- uint8_t m_Offset[6];
-};
-
-#pragma pack(pop)
-
-struct BlockStoreFile : public RefCounted
-{
- explicit BlockStoreFile(const std::filesystem::path& BlockPath);
- ~BlockStoreFile();
- const std::filesystem::path& GetPath() const;
- void Open();
- void Create(uint64_t InitialSize);
- void MarkAsDeleteOnClose();
- uint64_t FileSize();
- IoBuffer GetChunk(uint64_t Offset, uint64_t Size);
- void Read(void* Data, uint64_t Size, uint64_t FileOffset);
- void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
- void Flush();
- BasicFile& GetBasicFile();
- void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
-
-private:
- const std::filesystem::path m_Path;
- IoBuffer m_IoBuffer;
- BasicFile m_File;
-};
-
-class BlockStore
-{
-public:
- struct ReclaimSnapshotState
- {
- std::unordered_set<uint32_t> m_ActiveWriteBlocks;
- size_t BlockCount;
- };
-
- typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray;
- typedef std::vector<size_t> ChunkIndexArray;
-
- typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback;
- typedef std::function<uint64_t()> ClaimDiskReserveCallback;
- typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
- typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
- typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
-
- void Initialize(const std::filesystem::path& BlocksBasePath,
- uint64_t MaxBlockSize,
- uint64_t MaxBlockCount,
- const std::vector<BlockStoreLocation>& KnownLocations);
- void Close();
-
- void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback);
-
- IoBuffer TryGetChunk(const BlockStoreLocation& Location) const;
- void Flush();
-
- ReclaimSnapshotState GetReclaimSnapshotState();
- void ReclaimSpace(
- const ReclaimSnapshotState& Snapshot,
- const std::vector<BlockStoreLocation>& ChunkLocations,
- const ChunkIndexArray& KeepChunkIndexes,
- uint64_t PayloadAlignment,
- bool DryRun,
- const ReclaimCallback& ChangeCallback = [](const MovedChunksArray&, const ChunkIndexArray&) {},
- const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; });
-
- void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
- const IterateChunksSmallSizeCallback& SmallSizeCallback,
- const IterateChunksLargeSizeCallback& LargeSizeCallback);
-
- static const char* GetBlockFileExtension();
- static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);
-
- inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
-
-private:
- std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
-
- mutable RwLock m_InsertLock; // used to serialize inserts
- Ref<BlockStoreFile> m_WriteBlock;
- std::uint64_t m_CurrentInsertOffset = 0;
- std::atomic_uint32_t m_WriteBlockIndex{};
- std::vector<uint32_t> m_ActiveWriteBlocks;
-
- uint64_t m_MaxBlockSize = 1u << 28;
- uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1;
- std::filesystem::path m_BlocksBasePath;
-
- std::atomic_uint64_t m_TotalSize{};
-};
-
-void blockstore_forcelink();
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
deleted file mode 100644
index d8c3f22f3..000000000
--- a/zenstore/include/zenstore/caslog.h
+++ /dev/null
@@ -1,91 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/uid.h>
-#include <zenutil/basicfile.h>
-
-namespace zen {
-
-class CasLogFile
-{
-public:
- CasLogFile();
- ~CasLogFile();
-
- enum class Mode
- {
- kRead,
- kWrite,
- kTruncate
- };
-
- static bool IsValid(std::filesystem::path FileName, size_t RecordSize);
- void Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode);
- void Append(const void* DataPointer, uint64_t DataSize);
- void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount);
- void Flush();
- void Close();
- uint64_t GetLogSize();
- uint64_t GetLogCount();
-
-private:
- struct FileHeader
- {
- uint8_t Magic[16];
- uint32_t RecordSize = 0;
- Oid LogId;
- uint32_t ValidatedTail = 0;
- uint32_t Pad[6];
- uint32_t Checksum = 0;
-
- static const inline uint8_t MagicSequence[16] = {'.', '-', '=', ' ', 'C', 'A', 'S', 'L', 'O', 'G', 'v', '1', ' ', '=', '-', '.'};
-
- ZENCORE_API uint32_t ComputeChecksum();
- void Finalize() { Checksum = ComputeChecksum(); }
- };
-
- static_assert(sizeof(FileHeader) == 64);
-
-private:
- void Open(std::filesystem::path FileName, size_t RecordSize, BasicFile::Mode Mode);
-
- BasicFile m_File;
- FileHeader m_Header;
- size_t m_RecordSize = 1;
- std::atomic<uint64_t> m_AppendOffset = 0;
-};
-
-template<typename T>
-class TCasLogFile : public CasLogFile
-{
-public:
- static bool IsValid(std::filesystem::path FileName) { return CasLogFile::IsValid(FileName, sizeof(T)); }
- void Open(std::filesystem::path FileName, Mode Mode) { CasLogFile::Open(FileName, sizeof(T), Mode); }
-
- // This should be called before the Replay() is called to do some basic sanity checking
- bool Initialize() { return true; }
-
- void Replay(Invocable<const T&> auto Handler, uint64_t SkipEntryCount)
- {
- CasLogFile::Replay(
- [&](const void* VoidPtr) {
- const T& Record = *reinterpret_cast<const T*>(VoidPtr);
-
- Handler(Record);
- },
- SkipEntryCount);
- }
-
- void Append(const T& Record)
- {
- // TODO: implement some more efficent path here so we don't end up with
- // a syscall per append
-
- CasLogFile::Append(&Record, sizeof Record);
- }
-
- void Append(const std::span<T>& Records) { CasLogFile::Append(Records.data(), sizeof(T) * Records.size()); }
-};
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
deleted file mode 100644
index 16ca78225..000000000
--- a/zenstore/include/zenstore/cidstore.h
+++ /dev/null
@@ -1,87 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "zenstore.h"
-
-#include <zencore/iohash.h>
-#include <zenstore/hashkeyset.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_map.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#include <filesystem>
-
-namespace zen {
-
-class GcManager;
-class CasStore;
-class CompressedBuffer;
-class IoBuffer;
-class ScrubContext;
-
-/** Content Store
- *
- * Data in the content store is referenced by content identifiers (CIDs), it works
- * with compressed buffers so the CID is expected to be the RAW hash. It stores the
- * chunk directly under the RAW hash.
- * This class maps uncompressed hashes (CIDs) to compressed hashes and may
- * be used to deal with other kinds of indirections in the future. For example, if we want
- * to support chunking then a CID may represent a list of chunks which could be concatenated
- * to form the referenced chunk.
- *
- */
-
-struct CidStoreSize
-{
- uint64_t TinySize = 0;
- uint64_t SmallSize = 0;
- uint64_t LargeSize = 0;
- uint64_t TotalSize = 0;
-};
-
-struct CidStoreConfiguration
-{
- // Root directory for CAS store
- std::filesystem::path RootDirectory;
-
- // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy
- uint64_t TinyValueThreshold = 1024;
-
- // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy
- uint64_t HugeValueThreshold = 1024 * 1024;
-};
-
-class CidStore
-{
-public:
- CidStore(GcManager& Gc);
- ~CidStore();
-
- struct InsertResult
- {
- bool New = false;
- };
- enum class InsertMode
- {
- kCopyOnly,
- kMayBeMovedInPlace
- };
-
- void Initialize(const CidStoreConfiguration& Config);
- InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
- IoBuffer FindChunkByCid(const IoHash& DecompressedId);
- bool ContainsChunk(const IoHash& DecompressedId);
- void FilterChunks(HashKeySet& InOutChunks);
- void Flush();
- void Scrub(ScrubContext& Ctx);
- CidStoreSize TotalSize() const;
-
-private:
- struct Impl;
- std::unique_ptr<CasStore> m_CasStore;
- std::unique_ptr<Impl> m_Impl;
-};
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
deleted file mode 100644
index e0354b331..000000000
--- a/zenstore/include/zenstore/gc.h
+++ /dev/null
@@ -1,242 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/iohash.h>
-#include <zencore/thread.h>
-#include <zenstore/caslog.h>
-
-#include <atomic>
-#include <chrono>
-#include <condition_variable>
-#include <filesystem>
-#include <functional>
-#include <optional>
-#include <span>
-#include <thread>
-
-#define ZEN_USE_REF_TRACKING 0 // This is not currently functional
-
-namespace spdlog {
-class logger;
-}
-
-namespace zen {
-
-class HashKeySet;
-class GcManager;
-class CidStore;
-struct IoHash;
-
-/** GC clock
- */
-class GcClock
-{
-public:
- using Clock = std::chrono::system_clock;
- using TimePoint = Clock::time_point;
- using Duration = Clock::duration;
- using Tick = int64_t;
-
- static Tick TickCount() { return Now().time_since_epoch().count(); }
- static TimePoint Now() { return Clock::now(); }
- static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; }
-};
-
-/** Garbage Collection context object
- */
-class GcContext
-{
-public:
- GcContext(const GcClock::TimePoint& ExpireTime);
- ~GcContext();
-
- void AddRetainedCids(std::span<const IoHash> Cid);
- void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys);
-
- void IterateCids(std::function<void(const IoHash&)> Callback);
-
- void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc);
- void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc);
-
- void AddDeletedCids(std::span<const IoHash> Cas);
- const HashKeySet& DeletedCids();
-
- std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const;
-
- bool IsDeletionMode() const;
- void SetDeletionMode(bool NewState);
-
- bool CollectSmallObjects() const;
- void CollectSmallObjects(bool NewState);
-
- GcClock::TimePoint ExpireTime() const;
-
- void DiskReservePath(const std::filesystem::path& Path);
- uint64_t ClaimGCReserve();
-
-private:
- struct GcState;
-
- std::unique_ptr<GcState> m_State;
-};
-
-/** GC root contributor
-
- Higher level data structures provide roots for the garbage collector,
- which ultimately determine what is garbage and what data we need to
- retain.
-
- */
-class GcContributor
-{
-public:
- GcContributor(GcManager& Gc);
- ~GcContributor();
-
- virtual void GatherReferences(GcContext& GcCtx) = 0;
-
-protected:
- GcManager& m_Gc;
-};
-
-struct GcStorageSize
-{
- uint64_t DiskSize{};
- uint64_t MemorySize{};
-};
-
-/** GC storage provider
- */
-class GcStorage
-{
-public:
- GcStorage(GcManager& Gc);
- ~GcStorage();
-
- virtual void CollectGarbage(GcContext& GcCtx) = 0;
- virtual GcStorageSize StorageSize() const = 0;
-
-private:
- GcManager& m_Gc;
-};
-
-/** GC orchestrator
- */
-class GcManager
-{
-public:
- GcManager();
- ~GcManager();
-
- void AddGcContributor(GcContributor* Contributor);
- void RemoveGcContributor(GcContributor* Contributor);
-
- void AddGcStorage(GcStorage* Contributor);
- void RemoveGcStorage(GcStorage* Contributor);
-
- void CollectGarbage(GcContext& GcCtx);
-
- GcStorageSize TotalStorageSize() const;
-
-#if ZEN_USE_REF_TRACKING
- void OnNewCidReferences(std::span<IoHash> Hashes);
- void OnCommittedCidReferences(std::span<IoHash> Hashes);
- void OnDroppedCidReferences(std::span<IoHash> Hashes);
-#endif
-
-private:
- spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- mutable RwLock m_Lock;
- std::vector<GcContributor*> m_GcContribs;
- std::vector<GcStorage*> m_GcStorage;
- CidStore* m_CidStore = nullptr;
-};
-
-enum class GcSchedulerStatus : uint32_t
-{
- kIdle,
- kRunning,
- kStopped
-};
-
-struct GcSchedulerConfig
-{
- std::filesystem::path RootDirectory;
- std::chrono::seconds MonitorInterval{30};
- std::chrono::seconds Interval{};
- std::chrono::seconds MaxCacheDuration{86400};
- bool CollectSmallObjects = true;
- bool Enabled = true;
- uint64_t DiskReserveSize = 1ul << 28;
- uint64_t DiskSizeSoftLimit = 0;
-};
-
-class DiskUsageWindow
-{
-public:
- struct DiskUsageEntry
- {
- GcClock::Tick SampleTime;
- uint64_t DiskUsage;
- };
-
- std::vector<DiskUsageEntry> m_LogWindow;
- inline void Append(const DiskUsageEntry& Entry) { m_LogWindow.push_back(Entry); }
- inline void Append(DiskUsageEntry&& Entry) { m_LogWindow.emplace_back(std::move(Entry)); }
- void KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick);
- std::vector<uint64_t> GetDiskDeltas(GcClock::Tick StartTick,
- GcClock::Tick EndTick,
- GcClock::Tick DeltaWidth,
- uint64_t& OutMaxDelta) const;
- GcClock::Tick FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const;
-};
-
-/**
- * GC scheduler
- */
-class GcScheduler
-{
-public:
- GcScheduler(GcManager& GcManager);
- ~GcScheduler();
-
- void Initialize(const GcSchedulerConfig& Config);
- void Shutdown();
- GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
-
- struct TriggerParams
- {
- bool CollectSmallObjects = false;
- std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max();
- uint64_t DiskSizeSoftLimit = 0;
- };
-
- bool Trigger(const TriggerParams& Params);
-
-private:
- void SchedulerThread();
- void CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects);
- GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime);
- spdlog::logger& Log() { return m_Log; }
-
- spdlog::logger& m_Log;
- GcManager& m_GcManager;
- GcSchedulerConfig m_Config;
- GcClock::TimePoint m_LastGcTime{};
- GcClock::TimePoint m_LastGcExpireTime{};
- GcClock::TimePoint m_NextGcTime{};
- std::atomic_uint32_t m_Status{};
- std::thread m_GcThread;
- std::mutex m_GcMutex;
- std::condition_variable m_GcSignal;
- std::optional<TriggerParams> m_TriggerParams;
-
- TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
- DiskUsageWindow m_DiskUsageWindow;
-};
-
-void gc_forcelink();
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/hashkeyset.h b/zenstore/include/zenstore/hashkeyset.h
deleted file mode 100644
index 411a6256e..000000000
--- a/zenstore/include/zenstore/hashkeyset.h
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "zenstore.h"
-
-#include <zencore/iohash.h>
-
-#include <functional>
-#include <unordered_set>
-
-namespace zen {
-
-/** Manage a set of IoHash values
- */
-
-class HashKeySet
-{
-public:
- void AddHashToSet(const IoHash& HashToAdd);
- void AddHashesToSet(std::span<const IoHash> HashesToAdd);
- void RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
- void IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const;
- [[nodiscard]] inline bool ContainsHash(const IoHash& Hash) const { return m_HashSet.find(Hash) != m_HashSet.end(); }
- [[nodiscard]] inline bool IsEmpty() const { return m_HashSet.empty(); }
- [[nodiscard]] inline size_t GetSize() const { return m_HashSet.size(); }
-
- inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) const
- {
- for (const IoHash& Candidate : Candidates)
- {
- if (ContainsHash(Candidate))
- {
- MatchFunc(Candidate);
- }
- }
- }
-
- inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) const
- {
- for (const IoHash& Candidate : Candidates)
- {
- MatchFunc(Candidate, ContainsHash(Candidate));
- }
- }
-
-private:
- // Q: should we protect this with a lock, or is that a higher level concern?
- std::unordered_set<IoHash, IoHash::Hasher> m_HashSet;
-};
-
-void hashkeyset_forcelink();
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/scrubcontext.h b/zenstore/include/zenstore/scrubcontext.h
deleted file mode 100644
index 0b884fcc6..000000000
--- a/zenstore/include/zenstore/scrubcontext.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/timer.h>
-#include <zenstore/hashkeyset.h>
-
-namespace zen {
-
-/** Context object for data scrubbing
- *
- * Data scrubbing is when we traverse stored data to validate it and
- * optionally correct/recover
- */
-
-class ScrubContext
-{
-public:
- virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); }
- inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
- inline bool RunRecovery() const { return m_Recover; }
- void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
- {
- m_ChunkCount.fetch_add(ChunkCount);
- m_ByteCount.fetch_add(ChunkBytes);
- }
-
- inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
- inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
-
- const HashKeySet BadCids() const { return m_BadCid; }
-
-private:
- uint64_t m_ScrubTime = GetHifreqTimerValue();
- bool m_Recover = true;
- std::atomic<uint64_t> m_ChunkCount{0};
- std::atomic<uint64_t> m_ByteCount{0};
- HashKeySet m_BadCid;
-};
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/zenstore.h b/zenstore/include/zenstore/zenstore.h
deleted file mode 100644
index 46d62029d..000000000
--- a/zenstore/include/zenstore/zenstore.h
+++ /dev/null
@@ -1,13 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/zencore.h>
-
-#define ZENSTORE_API
-
-namespace zen {
-
-ZENSTORE_API void zenstore_forcelinktests();
-
-}
diff --git a/zenstore/xmake.lua b/zenstore/xmake.lua
deleted file mode 100644
index 4469c5650..000000000
--- a/zenstore/xmake.lua
+++ /dev/null
@@ -1,9 +0,0 @@
--- Copyright Epic Games, Inc. All Rights Reserved.
-
-target('zenstore')
- set_kind("static")
- add_headerfiles("**.h")
- add_files("**.cpp")
- add_includedirs("include", {public=true})
- add_deps("zencore", "zenutil")
- add_packages("vcpkg::robin-map")
diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp
deleted file mode 100644
index d87652fde..000000000
--- a/zenstore/zenstore.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "zenstore/zenstore.h"
-
-#if ZEN_WITH_TESTS
-
-# include <zenstore/blockstore.h>
-# include <zenstore/gc.h>
-# include <zenstore/hashkeyset.h>
-# include <zenutil/basicfile.h>
-
-# include "cas.h"
-# include "compactcas.h"
-# include "filecas.h"
-
-namespace zen {
-
-void
-zenstore_forcelinktests()
-{
- basicfile_forcelink();
- CAS_forcelink();
- filecas_forcelink();
- blockstore_forcelink();
- compactcas_forcelink();
- gc_forcelink();
- hashkeyset_forcelink();
-}
-
-} // namespace zen
-
-#endif