diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenstore | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 1312 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 355 | ||||
| -rw-r--r-- | src/zenstore/cas.h | 67 | ||||
| -rw-r--r-- | src/zenstore/caslog.cpp | 236 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 125 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 1511 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 95 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 1452 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 102 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 1312 | ||||
| -rw-r--r-- | src/zenstore/hashkeyset.cpp | 60 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 175 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/caslog.h | 91 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 87 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 242 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/hashkeyset.h | 54 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/scrubcontext.h | 41 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/zenstore.h | 13 | ||||
| -rw-r--r-- | src/zenstore/xmake.lua | 9 | ||||
| -rw-r--r-- | src/zenstore/zenstore.cpp | 32 |
20 files changed, 7371 insertions, 0 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp new file mode 100644 index 000000000..5dfa10c91 --- /dev/null +++ b/src/zenstore/blockstore.cpp @@ -0,0 +1,1312 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/blockstore.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> + +#include <algorithm> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <random> +#endif + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath) +{ +} + +BlockStoreFile::~BlockStoreFile() +{ + m_IoBuffer = IoBuffer(); + m_File.Detach(); +} + +const std::filesystem::path& +BlockStoreFile::GetPath() const +{ + return m_Path; +} + +void +BlockStoreFile::Open() +{ + m_File.Open(m_Path, BasicFile::Mode::kDelete); + void* FileHandle = m_File.Handle(); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); +} + +void +BlockStoreFile::Create(uint64_t InitialSize) +{ + auto ParentPath = m_Path.parent_path(); + if (!std::filesystem::is_directory(ParentPath)) + { + CreateDirectories(ParentPath); + } + + m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete); + void* FileHandle = m_File.Handle(); + + // We map our m_IoBuffer beyond the file size as we will grow it over time and want + // to be able to create sub-buffers of all the written range later + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); +} + +uint64_t +BlockStoreFile::FileSize() +{ + return m_File.FileSize(); +} + +void +BlockStoreFile::MarkAsDeleteOnClose() +{ + m_IoBuffer.MarkAsDeleteOnClose(); +} + +IoBuffer +BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size) +{ + return IoBuffer(m_IoBuffer, Offset, Size); +} + +void +BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset) +{ + m_File.Read(Data, Size, FileOffset); +} + +void +BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) +{ + m_File.Write(Data, Size, FileOffset); +} + +void +BlockStoreFile::Flush() +{ + m_File.Flush(); +} + +BasicFile& +BlockStoreFile::GetBasicFile() +{ + return m_File; +} + +void +BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) +{ + m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); +} + +constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; + +void +BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + const std::vector<BlockStoreLocation>& KnownLocations) +{ + ZEN_ASSERT(MaxBlockSize > 0); + ZEN_ASSERT(MaxBlockCount > 0); + ZEN_ASSERT(IsPow2(MaxBlockCount)); + + m_TotalSize = 0; + m_BlocksBasePath = BlocksBasePath; + m_MaxBlockSize = MaxBlockSize; + + m_ChunkBlocks.clear(); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : KnownLocations) + { + KnownBlocks.insert(Entry.BlockIndex); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector<std::filesystem::path> FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != GetBlockFileExtension()) + { + continue; + } + std::string FileName = PathToUtf8(Path.stem()); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_DEBUG("removing unused block at '{}'", Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)}; + BlockFile->Open(); + m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } +} + +void +BlockStore::Close() +{ + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + m_WriteBlock = nullptr; + m_CurrentInsertOffset = 0; + m_WriteBlockIndex = 0; + + m_ChunkBlocks.clear(); + m_BlocksBasePath.clear(); +} + +void +BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback) +{ + ZEN_ASSERT(Data != nullptr); + ZEN_ASSERT(Size > 0u); + ZEN_ASSERT(Size <= m_MaxBlockSize); + ZEN_ASSERT(Alignment > 0u); + + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = !!m_WriteBlock; + if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize) + { + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); + } + + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + } + + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + + Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); + NewBlockFile->Create(m_MaxBlockSize); + + m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; + m_WriteBlock = NewBlockFile; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } + uint64_t InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); + uint64_t AlignedWriteSize = m_CurrentInsertOffset - InsertOffset; + Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + m_ActiveWriteBlocks.push_back(WriteBlockIndex); + InsertLock.ReleaseNow(); + + WriteBlock->Write(Data, Size, InsertOffset); + m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed); + + Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}); + + { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); + } +} + +BlockStore::ReclaimSnapshotState +BlockStore::GetReclaimSnapshotState() +{ + ReclaimSnapshotState State; + RwLock::SharedLockScope _(m_InsertLock); + for (uint32_t BlockIndex : m_ActiveWriteBlocks) + { + State.m_ActiveWriteBlocks.insert(BlockIndex); + } + if (m_WriteBlock) + { + State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex); + } + State.BlockCount = m_ChunkBlocks.size(); + return State; +} + +IoBuffer +BlockStore::TryGetChunk(const BlockStoreLocation& Location) const +{ + RwLock::SharedLockScope InsertLock(m_InsertLock); + if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) + { + if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + { + return Block->GetChunk(Location.Offset, Location.Size); + } + } + return IoBuffer(); +} + +void +BlockStore::Flush() +{ + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } +} + +void +BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, + const std::vector<BlockStoreLocation>& ChunkLocations, + const ChunkIndexArray& KeepChunkIndexes, + uint64_t PayloadAlignment, + bool DryRun, + const ReclaimCallback& ChangeCallback, + const ClaimDiskReserveCallback& DiskReserveCallback) +{ + if (ChunkLocations.empty()) + { + return; + } + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = ChunkLocations.size(); + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = 0; + uint64_t NewTotalSize = 0; + + uint64_t MovedCount = 0; + uint64_t DeletedCount = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG( + "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " + "{} " + "of {} " + "chunks ({}).", + m_BlocksBasePath, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + }); + + size_t BlockCount = Snapshot.BlockCount; + if (BlockCount == 0) + { + ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath); + return; + } + + std::unordered_set<size_t> KeepChunkMap; + KeepChunkMap.reserve(KeepChunkIndexes.size()); + for (size_t KeepChunkIndex : KeepChunkIndexes) + { + KeepChunkMap.insert(KeepChunkIndex); + } + + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<ChunkIndexArray> BlockKeepChunks; + std::vector<ChunkIndexArray> BlockDeleteChunks; + + BlockIndexToChunkMapIndex.reserve(BlockCount); + BlockKeepChunks.reserve(BlockCount); + BlockDeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; + + size_t DeleteCount = 0; + for (size_t Index = 0; Index < TotalChunkCount; ++Index) + { + const BlockStoreLocation& Location = ChunkLocations[Index]; + OldTotalSize += Location.Size; + if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex)) + { + continue; + } + + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = BlockKeepChunks.size(); + BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; + BlockKeepChunks.resize(ChunkMapIndex + 1); + BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); + BlockDeleteChunks.resize(ChunkMapIndex + 1); + BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } + + if (KeepChunkMap.contains(Index)) + { + ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + NewTotalSize += Location.Size; + continue; + } + ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + DeleteCount++; + } + + std::unordered_set<uint32_t> BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) + { + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) + { + continue; + } + BlocksToReWrite.insert(BlockIndex); + } + + if (DryRun) + { + ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}", + m_BlocksBasePath, + DeleteCount, + NiceBytes(OldTotalSize - NewTotalSize), + TotalChunkCount, + OldTotalSize); + return; + } + + Ref<BlockStoreFile> NewBlockFile; + try + { + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + for (uint32_t BlockIndex : BlocksToReWrite) + { + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; + + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + } + + if (!OldBlockFile) + { + // If the block file pointed to does not exist, move them all to deleted list + BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), + BlockKeepChunks[ChunkMapIndex].begin(), + BlockKeepChunks[ChunkMapIndex].end()); + BlockKeepChunks[ChunkMapIndex].clear(); + } + + const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + DeletedSize += ChunkLocations[DeleteIndex].Size; + } + ChangeCallback({}, DeleteMap); + DeletedCount += DeleteMap.size(); + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (OldBlockFile) + { + m_ChunkBlocks[BlockIndex] = nullptr; + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); + OldBlockFile->MarkAsDeleteOnClose(); + } + } + continue; + } + + ZEN_ASSERT(OldBlockFile); + + MovedChunksArray MovedChunks; + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepMap) + { + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + + if (NewBlockFile) + { + NewBlockFile->Flush(); + NewBlockFile = nullptr; + } + { + ChangeCallback(MovedChunks, {}); + MovedCount += KeepMap.size(); + MovedChunks.clear(); + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1); + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); + return; + } + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = DiskReserveCallback(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_BlocksBasePath, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; + } + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); + uint64_t OldOffset = WriteOffset; + WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed); + } + Chunk.clear(); + if (NewBlockFile) + { + NewBlockFile->Flush(); + NewBlockFile = nullptr; + } + + const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + DeletedSize += ChunkLocations[DeleteIndex].Size; + } + + ChangeCallback(MovedChunks, DeleteMap); + MovedCount += KeepMap.size(); + DeletedCount += DeleteMap.size(); + MovedChunks.clear(); + { + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks[BlockIndex] = nullptr; + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); + OldBlockFile->MarkAsDeleteOnClose(); + } + } + } + catch (std::exception& ex) + { + ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); + if (NewBlockFile) + { + ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); + m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed); + NewBlockFile->MarkAsDeleteOnClose(); + } + } +} + +void +BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, + const IterateChunksSmallSizeCallback& SmallSizeCallback, + const IterateChunksLargeSizeCallback& LargeSizeCallback) +{ + std::vector<size_t> LocationIndexes; + LocationIndexes.reserve(ChunkLocations.size()); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) + { + LocationIndexes.push_back(ChunkIndex); + } + std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { + const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; + const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; + if (LocationA.BlockIndex < LocationB.BlockIndex) + { + return true; + } + else if (LocationA.BlockIndex > LocationB.BlockIndex) + { + return false; + } + return LocationA.Offset < LocationB.Offset; + }); + + IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + RwLock::SharedLockScope _(m_InsertLock); + + auto GetNextRange = [&](size_t StartIndexOffset) { + size_t ChunkCount = 0; + size_t StartIndex = LocationIndexes[StartIndexOffset]; + const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; + uint64_t StartOffset = StartLocation.Offset; + while (StartIndexOffset + ChunkCount < LocationIndexes.size()) + { + size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount]; + const BlockStoreLocation& Location = ChunkLocations[NextIndex]; + if (Location.BlockIndex != StartLocation.BlockIndex) + { + break; + } + if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize) + { + break; + } + ++ChunkCount; + } + return ChunkCount; + }; + + size_t LocationIndexOffset = 0; + while (LocationIndexOffset < LocationIndexes.size()) + { + size_t ChunkIndex = LocationIndexes[LocationIndexOffset]; + const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; + + const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex]; + if (!BlockFile) + { + while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex) + { + SmallSizeCallback(ChunkIndex, nullptr, 0); + LocationIndexOffset++; + if (LocationIndexOffset == LocationIndexes.size()) + { + break; + } + ChunkIndex = LocationIndexes[LocationIndexOffset]; + } + continue; + } + size_t BlockSize = BlockFile->FileSize(); + size_t RangeCount = GetNextRange(LocationIndexOffset); + if (RangeCount > 0) + { + size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1]; + const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; + uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + BlockFile->Read(BufferBase, Size, FirstLocation.Offset); + for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) + { + size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex]; + const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; + if (ChunkLocation.Size == 0 || (ChunkLocation.Offset + ChunkLocation.Size > BlockSize)) + { + SmallSizeCallback(NextChunkIndex, nullptr, 0); + continue; + } + void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; + SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + } + LocationIndexOffset += RangeCount; + continue; + } + if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) + { + SmallSizeCallback(ChunkIndex, nullptr, 0); + LocationIndexOffset++; + continue; + } + LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); + LocationIndexOffset++; + } +} + +const char* +BlockStore::GetBlockFileExtension() +{ + return ".ucas"; +} + +std::filesystem::path +BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) +{ + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(GetBlockFileExtension()); + return Path.ToPath(); +} + +#if ZEN_WITH_TESTS + +TEST_CASE("blockstore.blockstoredisklocation") +{ + BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; + CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4)); + + BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0}; + CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4)); + + BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; + CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4)); + + BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4)); + + BlockStoreLocation MaxBlockIndexAndOffset = + BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; + CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4)); + + BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, + .Offset = BlockStoreDiskLocation::MaxOffset * 4, + .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4)); + + BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, + .Offset = BlockStoreDiskLocation::MaxOffset * 4096, + .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096)); + + BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2, + .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4, + .Size = std::numeric_limits<uint32_t>::max() / 2}; + CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4)); +} + +TEST_CASE("blockstore.blockfile") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path() / "blocks"; + CreateDirectories(RootDirectory); + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Create(16384); + CHECK(File1.FileSize() == 0); + File1.Write("data", 5, 0); + IoBuffer DataChunk = File1.GetChunk(0, 5); + File1.Write("boop", 5, 5); + IoBuffer BoopChunk = File1.GetChunk(5, 5); + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + File1.Flush(); + CHECK(File1.FileSize() == 10); + } + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + + char DataRaw[5]; + File1.Read(DataRaw, 5, 0); + CHECK(std::string(DataRaw) == "data"); + IoBuffer DataChunk = File1.GetChunk(0, 5); + + char BoopRaw[5]; + File1.Read(BoopRaw, 5, 5); + CHECK(std::string(BoopRaw) == "boop"); + + IoBuffer BoopChunk = File1.GetChunk(5, 5); + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + + { + IoBuffer DataChunk; + IoBuffer BoopChunk; + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + DataChunk = File1.GetChunk(0, 5); + BoopChunk = File1.GetChunk(5, 5); + } + + CHECK(std::filesystem::exists(RootDirectory / "1")); + + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + CHECK(std::filesystem::exists(RootDirectory / "1")); + + { + IoBuffer DataChunk; + IoBuffer BoopChunk; + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + File1.MarkAsDeleteOnClose(); + DataChunk = File1.GetChunk(0, 5); + BoopChunk = File1.GetChunk(5, 5); + } + + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + CHECK(!std::filesystem::exists(RootDirectory / "1")); +} + +namespace blockstore::impl { + BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment) + { + BlockStoreLocation Location; + Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; }); + CHECK(Location.Size == String.length()); + return Location; + }; + + std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location) + { + IoBuffer ChunkData = Store.TryGetChunk(Location); + if (!ChunkData) + { + return ""; + } + std::string AsString((const char*)ChunkData.Data(), ChunkData.Size()); + return AsString; + }; + + std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories) + { + DirectoryContent DirectoryContent; + GetDirectoryContent(RootDir, + DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) | + (Directories ? DirectoryContent::IncludeDirsFlag : 0), + DirectoryContent); + std::vector<std::filesystem::path> Result; + Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end()); + Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end()); + return Result; + }; + + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace blockstore::impl + +TEST_CASE("blockstore.chunks") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory, 128, 1024, {}); + IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); + CHECK(!BadChunk); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + + CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); + CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); + + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); + CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex); + + CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); + CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); + CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); +} + +TEST_CASE("blockstore.clean.stray.blocks") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 128, 1024, {}); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + WriteStringAsChunk(Store, ThirdChunkData, 4); + + Store.Close(); + + // Not referencing the second block means that we should be deleted + Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation}); + + CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); +} + +TEST_CASE("blockstore.flush.forces.new.block") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 128, 1024, {}); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + WriteStringAsChunk(Store, FirstChunkData, 4); + Store.Flush(); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + WriteStringAsChunk(Store, ThirdChunkData, 4); + + CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3); +} + +TEST_CASE("blockstore.iterate.chunks") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {}); + IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); + CHECK(!BadChunk); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); + + std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); + BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); + + BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0}; + BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0, + .Offset = ScrubSmallChunkWindowSize, + .Size = ScrubSmallChunkWindowSize * 2}; + BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; + + Store.IterateChunks( + {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + switch (ChunkIndex) + { + case 0: + CHECK(Data); + CHECK(Size == FirstChunkData.size()); + CHECK(std::string((const char*)Data, Size) == FirstChunkData); + break; + case 1: + CHECK(Data); + CHECK(Size == SecondChunkData.size()); + CHECK(std::string((const char*)Data, Size) == SecondChunkData); + break; + case 2: + CHECK(false); + break; + case 3: + CHECK(!Data); + break; + case 4: + CHECK(!Data); + break; + case 5: + CHECK(!Data); + break; + default: + CHECK(false); + break; + } + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + switch (ChunkIndex) + { + case 0: + case 1: + CHECK(false); + break; + case 2: + { + CHECK(Size == VeryLargeChunk.size()); + char* Buffer = new char[Size]; + size_t HashOffset = 0; + File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { + memcpy(&Buffer[HashOffset], Data, Size); + HashOffset += Size; + }); + CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); + delete[] Buffer; + } + break; + case 3: + CHECK(false); + break; + case 4: + CHECK(false); + break; + case 5: + CHECK(false); + break; + default: + CHECK(false); + break; + } + }); +} + +TEST_CASE("blockstore.reclaim.space") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 512, 1024, {}); + + constexpr size_t ChunkCount = 200; + constexpr size_t Alignment = 8; + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkHashes; + ChunkLocations.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex); + + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); }); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<size_t> ChunksToKeep; + ChunksToKeep.reserve(ChunkLocations.size()); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + ChunksToKeep.push_back(ChunkIndex); + } + + Store.Flush(); + BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); + Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); + + // If we keep all the chunks we should not get any callbacks on moved/deleted stuff + Store.ReclaimSpace( + State1, + ChunkLocations, + ChunksToKeep, + Alignment, + false, + [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); }, + []() { + CHECK(false); + return 0; + }); + + size_t DeleteChunkCount = 38; + ChunksToKeep.clear(); + for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex) + { + ChunksToKeep.push_back(ChunkIndex); + } + + std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations; + size_t MovedChunkCount = 0; + size_t DeletedChunkCount = 0; + Store.ReclaimSpace( + State1, + ChunkLocations, + ChunksToKeep, + Alignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { + for (const auto& MovedChunk : MovedChunks) + { + CHECK(MovedChunk.first >= DeleteChunkCount); + NewChunkLocations[MovedChunk.first] = MovedChunk.second; + } + MovedChunkCount += MovedChunks.size(); + for (size_t DeletedIndex : DeletedChunks) + { + CHECK(DeletedIndex < DeleteChunkCount); + } + DeletedChunkCount += DeletedChunks.size(); + }, + []() { + CHECK(false); + return 0; + }); + CHECK(MovedChunkCount <= DeleteChunkCount); + CHECK(DeletedChunkCount == DeleteChunkCount); + ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end()); + + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); + if (ChunkIndex >= DeleteChunkCount) + { + IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + } + } + + NewChunkLocations = ChunkLocations; + MovedChunkCount = 0; + DeletedChunkCount = 0; + Store.ReclaimSpace( + State1, + ChunkLocations, + {}, + Alignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { + CHECK(MovedChunks.empty()); + DeletedChunkCount += DeletedChunks.size(); + }, + []() { + CHECK(false); + return 0; + }); + CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); +} + +TEST_CASE("blockstore.thread.read.write") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 1088, 1024, {}); + + constexpr size_t ChunkCount = 1000; + constexpr size_t Alignment = 8; + std::vector<IoBuffer> Chunks; + std::vector<IoHash> ChunkHashes; + Chunks.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2); + Chunks.push_back(Chunk); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<BlockStoreLocation> ChunkLocations; + ChunkLocations.resize(ChunkCount); + + WorkerThreadPool WorkerPool(8); + std::atomic<size_t> WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + std::vector<BlockStoreLocation> SecondChunkLocations; + SecondChunkLocations.resize(ChunkCount); + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }); + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size() * 2) + { + Sleep(1); + } +} + +#endif + +void +blockstore_forcelink() +{ +} + +} // namespace zen diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp new file mode 100644 index 000000000..fdec78c60 --- /dev/null +++ b/src/zenstore/cas.cpp @@ -0,0 +1,355 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "cas.h" + +#include "compactcas.h" +#include "filecas.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> +#include <zencore/thread.h> +#include <zencore/trace.h> +#include <zencore/uid.h> +#include <zenstore/cidstore.h> +#include <zenstore/gc.h> +#include <zenstore/scrubcontext.h> + +#include <gsl/gsl-lite.hpp> + +#include <filesystem> +#include <functional> +#include <unordered_map> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +/** + * CAS store implementation + * + * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space + * quickly for unused large chunks and to maintain locality for small chunks which are + * frequently accessed together. + * + */ +class CasImpl : public CasStore +{ +public: + CasImpl(GcManager& Gc); + virtual ~CasImpl(); + + virtual void Initialize(const CidStoreConfiguration& InConfig) override; + virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; + virtual bool ContainsChunk(const IoHash& ChunkHash) override; + virtual void FilterChunks(HashKeySet& InOutChunks) override; + virtual void Flush() override; + virtual void Scrub(ScrubContext& Ctx) override; + virtual void GarbageCollect(GcContext& GcCtx) override; + virtual CidStoreSize TotalSize() const override; + +private: + CasContainerStrategy m_TinyStrategy; + CasContainerStrategy m_SmallStrategy; + FileCasStrategy m_LargeStrategy; + CbObject m_ManifestObject; + + enum class StorageScheme + { + Legacy = 0, + WithCbManifest = 1 + }; + + StorageScheme m_StorageScheme = StorageScheme::Legacy; + + bool OpenOrCreateManifest(); + void UpdateManifest(); +}; + +CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc) +{ +} + +CasImpl::~CasImpl() +{ +} + +void +CasImpl::Initialize(const CidStoreConfiguration& InConfig) +{ + m_Config = InConfig; + + ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory); + + // Ensure root directory exists - create if it doesn't exist already + + std::filesystem::create_directories(m_Config.RootDirectory); + + // Open or create manifest + + const bool IsNewStore = OpenOrCreateManifest(); + + // Initialize payload storage + + m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); + m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block + m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block +} + +bool +CasImpl::OpenOrCreateManifest() +{ + bool IsNewStore = false; + + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; + + std::error_code Ec; + BasicFile ManifestFile; + ManifestFile.Open(ManifestPath.c_str(), BasicFile::Mode::kRead, Ec); + + bool ManifestIsOk = false; + + if (Ec) + { + if (Ec == std::errc::no_such_file_or_directory) + { + IsNewStore = true; + } + } + else + { + IoBuffer ManifestBuffer = ManifestFile.ReadAll(); + ManifestFile.Close(); + + if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#') + { + // Old-style manifest, does not contain any useful information, so we may as well update it + } + else + { + CbObject Manifest{SharedBuffer(ManifestBuffer)}; + CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All); + + if (ValidationResult == CbValidateError::None) + { + if (Manifest["id"]) + { + ManifestIsOk = true; + } + } + else + { + ZEN_WARN("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult)); + } + + if (ManifestIsOk) + { + m_ManifestObject = std::move(Manifest); + } + } + } + + if (!ManifestIsOk) + { + UpdateManifest(); + } + + return IsNewStore; +} + +void +CasImpl::UpdateManifest() +{ + if (!m_ManifestObject) + { + CbObjectWriter Cbo; + Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now(); + m_ManifestObject = Cbo.Save(); + } + + // Write manifest to file + + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; + + // This will throw on failure + + ZEN_TRACE("Writing new manifest to '{}'", ManifestPath); + + BasicFile Marker; + Marker.Open(ManifestPath.c_str(), BasicFile::Mode::kTruncate); + Marker.Write(m_ManifestObject.GetBuffer(), 0); +} + +CasStore::InsertResult +CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) +{ + ZEN_TRACE_CPU("CAS::InsertChunk"); + + const uint64_t ChunkSize = Chunk.Size(); + + if (ChunkSize < m_Config.TinyValueThreshold) + { + ZEN_ASSERT(ChunkSize); + + return m_TinyStrategy.InsertChunk(Chunk, ChunkHash); + } + else if (ChunkSize < m_Config.HugeValueThreshold) + { + return m_SmallStrategy.InsertChunk(Chunk, ChunkHash); + } + + return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode); +} + +IoBuffer +CasImpl::FindChunk(const IoHash& ChunkHash) +{ + ZEN_TRACE_CPU("CAS::FindChunk"); + + if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + // Not found + return IoBuffer{}; +} + +bool +CasImpl::ContainsChunk(const IoHash& ChunkHash) +{ + return m_SmallStrategy.HaveChunk(ChunkHash) || m_TinyStrategy.HaveChunk(ChunkHash) || m_LargeStrategy.HaveChunk(ChunkHash); +} + +void +CasImpl::FilterChunks(HashKeySet& InOutChunks) +{ + m_SmallStrategy.FilterChunks(InOutChunks); + m_TinyStrategy.FilterChunks(InOutChunks); + m_LargeStrategy.FilterChunks(InOutChunks); +} + +void +CasImpl::Flush() +{ + m_SmallStrategy.Flush(); + m_TinyStrategy.Flush(); + m_LargeStrategy.Flush(); +} + +void +CasImpl::Scrub(ScrubContext& Ctx) +{ + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + + m_SmallStrategy.Scrub(Ctx); + m_TinyStrategy.Scrub(Ctx); + m_LargeStrategy.Scrub(Ctx); +} + +void +CasImpl::GarbageCollect(GcContext& GcCtx) +{ + m_SmallStrategy.CollectGarbage(GcCtx); + m_TinyStrategy.CollectGarbage(GcCtx); + m_LargeStrategy.CollectGarbage(GcCtx); +} + +CidStoreSize +CasImpl::TotalSize() const +{ + const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize; + const uint64_t Small = m_SmallStrategy.StorageSize().DiskSize; + const uint64_t Large = m_LargeStrategy.StorageSize().DiskSize; + + return {.TinySize = Tiny, .SmallSize = Small, .LargeSize = Large, .TotalSize = Tiny + Small + Large}; +} + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<CasStore> +CreateCasStore(GcManager& Gc) +{ + return std::make_unique<CasImpl>(Gc); +} + +////////////////////////////////////////////////////////////////////////// +// +// Testing related code follows... +// + +#if ZEN_WITH_TESTS + +TEST_CASE("CasStore") +{ + ScopedTemporaryDirectory TempDir; + + CidStoreConfiguration config; + config.RootDirectory = TempDir.Path(); + + GcManager Gc; + + std::unique_ptr<CasStore> Store = CreateCasStore(Gc); + Store->Initialize(config); + + ScrubContext Ctx; + Store->Scrub(Ctx); + + IoBuffer Value1{16}; + memcpy(Value1.MutableData(), "1234567890123456", 16); + IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size()); + CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1); + CHECK(Result1.New); + + IoBuffer Value2{16}; + memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16); + IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size()); + CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2); + CHECK(Result2.New); + + HashKeySet ChunkSet; + ChunkSet.AddHashToSet(Hash1); + ChunkSet.AddHashToSet(Hash2); + + Store->FilterChunks(ChunkSet); + CHECK(ChunkSet.IsEmpty()); + + IoBuffer Lookup1 = Store->FindChunk(Hash1); + CHECK(Lookup1); + IoBuffer Lookup2 = Store->FindChunk(Hash2); + CHECK(Lookup2); +} + +void +CAS_forcelink() +{ +} + +#endif + +} // namespace zen diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h new file mode 100644 index 000000000..9c48d4707 --- /dev/null +++ b/src/zenstore/cas.h @@ -0,0 +1,67 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/blake3.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/refcount.h> +#include <zencore/timer.h> +#include <zenstore/cidstore.h> +#include <zenstore/hashkeyset.h> + +#include <atomic> +#include <filesystem> +#include <functional> +#include <memory> +#include <string> +#include <unordered_set> + +namespace zen { + +class GcContext; +class GcManager; +class ScrubContext; + +/** Content Addressable Storage interface + + */ + +class CasStore +{ +public: + virtual ~CasStore() = default; + + const CidStoreConfiguration& Config() { return m_Config; } + + struct InsertResult + { + bool New = false; + }; + + enum class InsertMode + { + kCopyOnly, + kMayBeMovedInPlace + }; + + virtual void Initialize(const CidStoreConfiguration& Config) = 0; + virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; + virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; + virtual void FilterChunks(HashKeySet& InOutChunks) = 0; + virtual void Flush() = 0; + virtual void Scrub(ScrubContext& Ctx) = 0; + virtual void GarbageCollect(GcContext& GcCtx) = 0; + virtual CidStoreSize TotalSize() const = 0; + +protected: + CidStoreConfiguration m_Config; + uint64_t m_LastScrubTime = 0; +}; + +ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); + +void CAS_forcelink(); + +} // namespace zen diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp new file mode 100644 index 000000000..2a978ae12 --- /dev/null +++ b/src/zenstore/caslog.cpp @@ -0,0 +1,236 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/caslog.h> + +#include "compactcas.h" + +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/uid.h> + +#include <xxhash.h> + +#include <gsl/gsl-lite.hpp> + +#include <filesystem> +#include <functional> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +uint32_t +CasLogFile::FileHeader::ComputeChecksum() +{ + return XXH32(&this->Magic, sizeof(FileHeader) - 4, 0xC0C0'BABA); +} + +CasLogFile::CasLogFile() +{ +} + +CasLogFile::~CasLogFile() +{ +} + +bool +CasLogFile::IsValid(std::filesystem::path FileName, size_t RecordSize) +{ + if (!std::filesystem::is_regular_file(FileName)) + { + return false; + } + BasicFile File; + + std::error_code Ec; + File.Open(FileName, BasicFile::Mode::kRead, Ec); + if (Ec) + { + return false; + } + + FileHeader Header; + if (File.FileSize() < sizeof(Header)) + { + return false; + } + + // Validate header and log contents and prepare for appending/replay + File.Read(&Header, sizeof Header, 0); + + if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum())) + { + return false; + } + if (Header.RecordSize != RecordSize) + { + return false; + } + return true; +} + +void +CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode) +{ + m_RecordSize = RecordSize; + + std::error_code Ec; + BasicFile::Mode FileMode = BasicFile::Mode::kRead; + switch (Mode) + { + case Mode::kWrite: + FileMode = BasicFile::Mode::kWrite; + break; + case Mode::kTruncate: + FileMode = BasicFile::Mode::kTruncate; + break; + } + + m_File.Open(FileName, FileMode, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to open log file '{}'", FileName)); + } + + uint64_t AppendOffset = 0; + + if ((Mode == Mode::kTruncate) || (m_File.FileSize() < sizeof(FileHeader))) + { + if (Mode == Mode::kRead) + { + throw std::runtime_error(fmt::format("Mangled log header (file to small) in '{}'", FileName)); + } + // Initialize log by writing header + FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0}; + memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic); + Header.Finalize(); + + m_File.Write(&Header, sizeof Header, 0); + + AppendOffset = sizeof(FileHeader); + + m_Header = Header; + } + else + { + FileHeader Header; + m_File.Read(&Header, sizeof Header, 0); + + if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum())) + { + throw std::runtime_error(fmt::format("Mangled log header (invalid header magic) in '{}'", FileName)); + } + if (Header.RecordSize != RecordSize) + { + throw std::runtime_error(fmt::format("Mangled log header (mismatch in record size, expected {}, found {}) in '{}'", + RecordSize, + Header.RecordSize, + FileName)); + } + + AppendOffset = m_File.FileSize(); + + // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended + + AppendOffset -= sizeof Header; + AppendOffset -= AppendOffset % RecordSize; + AppendOffset += sizeof Header; + + m_Header = Header; + } + + m_AppendOffset = AppendOffset; +} + +void +CasLogFile::Close() +{ + // TODO: update header and maybe add trailer + Flush(); + + m_File.Close(); +} + +uint64_t +CasLogFile::GetLogSize() +{ + return m_File.FileSize(); +} + +uint64_t +CasLogFile::GetLogCount() +{ + uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire); + if (LogFileSize < sizeof(FileHeader)) + { + return 0; + } + const uint64_t LogBaseOffset = sizeof(FileHeader); + const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; + return LogEntryCount; +} + +void +CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount) +{ + uint64_t LogFileSize = m_File.FileSize(); + + // Ensure we end up on a clean boundary + uint64_t LogBaseOffset = sizeof(FileHeader); + size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; + + if (LogEntryCount <= SkipEntryCount) + { + return; + } + + LogBaseOffset += SkipEntryCount * m_RecordSize; + LogEntryCount -= SkipEntryCount; + + // This should really be streaming the data rather than just + // reading it into memory, though we don't tend to get very + // large logs so it may not matter + + const uint64_t LogDataSize = LogEntryCount * m_RecordSize; + + std::vector<uint8_t> ReadBuffer; + ReadBuffer.resize(LogDataSize); + + m_File.Read(ReadBuffer.data(), LogDataSize, LogBaseOffset); + + for (int i = 0; i < int(LogEntryCount); ++i) + { + Handler(ReadBuffer.data() + (i * m_RecordSize)); + } + + m_AppendOffset = LogBaseOffset + (m_RecordSize * LogEntryCount); +} + +void +CasLogFile::Append(const void* DataPointer, uint64_t DataSize) +{ + ZEN_ASSERT((DataSize % m_RecordSize) == 0); + + uint64_t AppendOffset = m_AppendOffset.fetch_add(DataSize); + + std::error_code Ec; + m_File.Write(DataPointer, gsl::narrow<uint32_t>(DataSize), AppendOffset, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to write to log file '{}'", PathFromHandle(m_File.Handle()))); + } +} + +void +CasLogFile::Flush() +{ + m_File.Flush(); +} + +} // namespace zen diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp new file mode 100644 index 000000000..5a5116faf --- /dev/null +++ b/src/zenstore/cidstore.cpp @@ -0,0 +1,125 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenstore/cidstore.h" + +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/string.h> +#include <zenstore/scrubcontext.h> + +#include "cas.h" + +#include <filesystem> + +namespace zen { + +struct CidStore::Impl +{ + Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} + + CasStore& m_CasStore; + + void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); } + + CidStore::InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, CidStore::InsertMode Mode) + { +#ifndef NDEBUG + IoHash VerifyRawHash; + uint64_t _; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHash == VerifyRawHash); +#endif // NDEBUG + IoBuffer Payload(ChunkData); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, RawHash, static_cast<CasStore::InsertMode>(Mode)); + + return {.New = Result.New}; + } + + IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); } + + bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); } + + void FilterChunks(HashKeySet& InOutChunks) + { + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); }); + } + + void Flush() { m_CasStore.Flush(); } + + void Scrub(ScrubContext& Ctx) + { + if (Ctx.ScrubTimestamp() == m_LastScrubTime) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + + m_CasStore.Scrub(Ctx); + } + + uint64_t m_LastScrubTime = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore)) +{ +} + +CidStore::~CidStore() +{ +} + +void +CidStore::Initialize(const CidStoreConfiguration& Config) +{ + m_Impl->Initialize(Config); +} + +CidStore::InsertResult +CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode) +{ + return m_Impl->AddChunk(ChunkData, RawHash, Mode); +} + +IoBuffer +CidStore::FindChunkByCid(const IoHash& DecompressedId) +{ + return m_Impl->FindChunkByCid(DecompressedId); +} + +bool +CidStore::ContainsChunk(const IoHash& DecompressedId) +{ + return m_Impl->ContainsChunk(DecompressedId); +} + +void +CidStore::FilterChunks(HashKeySet& InOutChunks) +{ + return m_Impl->FilterChunks(InOutChunks); +} + +void +CidStore::Flush() +{ + m_Impl->Flush(); +} + +void +CidStore::Scrub(ScrubContext& Ctx) +{ + m_Impl->Scrub(Ctx); +} + +CidStoreSize +CidStore::TotalSize() const +{ + return m_Impl->m_CasStore.TotalSize(); +} + +} // namespace zen diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp new file mode 100644 index 000000000..7b2c21b0f --- /dev/null +++ b/src/zenstore/compactcas.cpp @@ -0,0 +1,1511 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "compactcas.h" + +#include "cas.h" + +#include <zencore/compress.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zenstore/scrubcontext.h> + +#include <gsl/gsl-lite.hpp> + +#include <xxhash.h> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <zenstore/cidstore.h> +# include <algorithm> +# include <random> +#endif + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +struct CasDiskIndexHeader +{ + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } +}; + +static_assert(sizeof(CasDiskIndexHeader) == 32); + +namespace { + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".ulog"; + + std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / ContainerBaseName; + } + + std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); + } + + std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / "blocks"; + } + + bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & CasDiskIndexEntry::kTombstone) + { + return true; + } + if (Entry.ContentType != ZenContentType::kUnknownContentType) + { + OutReason = + fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.GetSize(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + +CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas")) +{ +} + +CasContainerStrategy::~CasContainerStrategy() +{ +} + +void +CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + uint32_t MaxBlockSize, + uint64_t Alignment, + bool IsNewStore) +{ + ZEN_ASSERT(IsPow2(Alignment)); + ZEN_ASSERT(!m_IsInitialized); + ZEN_ASSERT(MaxBlockSize > 0); + + m_RootDirectory = RootDirectory; + m_ContainerBaseName = ContainerBaseName; + m_PayloadAlignment = Alignment; + m_MaxBlockSize = MaxBlockSize; + m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); + + OpenContainer(IsNewStore); + + m_IsInitialized = true; +} + +CasStore::InsertResult +CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) +{ + { + RwLock::SharedLockScope _(m_LocationMapLock); + if (m_LocationMap.contains(ChunkHash)) + { + return CasStore::InsertResult{.New = false}; + } + } + + // We can end up in a situation that InsertChunk writes the same chunk data in + // different locations. + // We release the insert lock once we have the correct WriteBlock ready and we know + // where to write the data. If a new InsertChunk request for the same chunk hash/data + // comes in before we update m_LocationMap below we will have a race. + // The outcome of that is that we will write the chunk data in more than one location + // but the chunk hash will only point to one of the chunks. + // We will in that case waste space until the next GC operation. + // + // This should be a rare occasion and the current flow reduces the time we block for + // reads, insert and GC. + + m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { + BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; + m_CasLog.Append(IndexEntry); + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, DiskLocation); + } + }); + + return CasStore::InsertResult{.New = true}; +} + +CasStore::InsertResult +CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) +{ +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); +} + +IoBuffer +CasContainerStrategy::FindChunk(const IoHash& ChunkHash) +{ + RwLock::SharedLockScope _(m_LocationMapLock); + auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) + { + return IoBuffer(); + } + const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment); + + IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); + return Chunk; +} + +bool +CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) +{ + RwLock::SharedLockScope _(m_LocationMapLock); + return m_LocationMap.contains(ChunkHash); +} + +void +CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) +{ + // This implementation is good enough for relatively small + // chunk sets (in terms of chunk identifiers), but would + // benefit from a better implementation which removes + // items incrementally for large sets, especially when + // we're likely to already have a large proportion of the + // chunks in the set + + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); +} + +void +CasContainerStrategy::Flush() +{ + m_BlockStore.Flush(); + m_CasLog.Flush(); + MakeIndexSnapshot(); +} + +void +CasContainerStrategy::Scrub(ScrubContext& Ctx) +{ + std::vector<IoHash> BadKeys; + uint64_t ChunkCount{0}, ChunkBytes{0}; + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + + RwLock::SharedLockScope _(m_LocationMapLock); + + uint64_t TotalChunkCount = m_LocationMap.size(); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + { + for (const auto& Entry : m_LocationMap) + { + const IoHash& ChunkHash = Entry.first; + const BlockStoreDiskLocation& DiskLocation = Entry.second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash.push_back(ChunkHash); + } + } + + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) + { + // ChunkLocation out of range of stored blocks + BadKeys.push_back(Hash); + return; + } + + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + if (RawHash != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS + IoHash ComputedHash = IoHash::HashBuffer(Data, Size); + if (ComputedHash == Hash) + { + return; + } +#endif + BadKeys.push_back(Hash); + }; + + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + + IoHash RawHash; + uint64_t RawSize; + // TODO: Add API to verify compressed buffer without having to memorymap the whole file + if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + if (RawHash != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS + IoHashStream Hasher; + File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + if (ComputedHash == Hash) + { + return; + } +#endif + BadKeys.push_back(Hash); + }; + + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + + _.ReleaseNow(); + + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + + if (!BadKeys.empty()) + { + ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); + + if (Ctx.RunRecovery()) + { + // Deal with bad chunks by removing them from our lookup map + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + for (const IoHash& ChunkHash : BadKeys) + { + const auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) + { + // Might have been GC'd + continue; + } + LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); + m_LocationMap.erase(KeyIt); + } + } + m_CasLog.Append(LogEntries); + } + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCidChunks(BadKeys); + + ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); +} + +void +CasContainerStrategy::CollectGarbage(GcContext& GcCtx) +{ + // It collects all the blocks that we want to delete chunks from. For each such + // block we keep a list of chunks to retain and a list of chunks to delete. + // + // If there is a block that we are currently writing to, that block is omitted + // from the garbage collection. + // + // Next it will iterate over all blocks that we want to remove chunks from. + // If the block is empty after removal of chunks we mark the block as pending + // delete - we want to delete it as soon as there are no IoBuffers using the + // block file. + // Once complete we update the m_LocationMap by removing the chunks. + // + // If the block is non-empty we write out the chunks we want to keep to a new + // block file (creating new block files as needed). + // + // We update the index as we complete each new block file. This makes it possible + // to break the GC if we want to limit time for execution. + // + // GC can very parallell to regular operation - it will block while taking + // a snapshot of the current m_LocationMap state and while moving blocks it will + // do a blocking operation and update the m_LocationMap after each new block is + // written and figuring out the path to the next new block. + + ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); + + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + + LocationMap_t LocationMap; + BlockStore::ReclaimSnapshotState BlockStoreState; + { + RwLock::SharedLockScope ___(m_LocationMapLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + LocationMap = m_LocationMap; + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + } + + uint64_t TotalChunkCount = LocationMap.size(); + + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : LocationMap) + { + TotalChunkHashes.push_back(Entry.first); + } + + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + + GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreDiskLocation& DiskLocation = KeyIt->second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + if (Keep) + { + KeepChunkIndexes.push_back(ChunkIndex); + } + }); + + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); + return; + } + + std::vector<IoHash> DeletedChunks; + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash]; + LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone}); + DeletedChunks.push_back(ChunkHash); + } + + m_CasLog.Append(LogEntries); + m_CasLog.Flush(); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + for (const CasDiskIndexEntry& Entry : LogEntries) + { + if (Entry.Flags & CasDiskIndexEntry::kTombstone) + { + m_LocationMap.erase(Entry.Key); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } + } + }, + [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); + + GcCtx.AddDeletedCids(DeletedChunks); +} + +void +CasContainerStrategy::MakeIndexSnapshot() +{ + uint64_t LogCount = m_CasLog.GetLogCount(); + if (m_LogFlushPosition == LogCount) + { + return; + } + + ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", + m_RootDirectory / m_ContainerBaseName, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); + fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); + + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(TempIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, TempIndexPath); + } + + try + { + // Write the current state of the location map to a new index state + std::vector<CasDiskIndexEntry> Entries; + + { + RwLock::SharedLockScope ___(m_LocationMapLock); + Entries.resize(m_LocationMap.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_LocationMap) + { + CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second; + } + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CasDiskIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + m_LogFlushPosition = LogCount; + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(IndexPath); + fs::rename(TempIndexPath, IndexPath); + } + } + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(TempIndexPath); + } +} + +uint64_t +CasContainerStrategy::ReadIndexFile() +{ + std::vector<CasDiskIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", + IndexPath, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CasDiskIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); + CasDiskIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && + (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) + { + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; + + std::string InvalidEntryReason; + for (const CasDiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } + + return Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + } + } + return 0; +} + +uint64_t +CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) +{ + std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(LogPath)) + { + size_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", + m_RootDirectory / m_ContainerBaseName, + LogEntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + LogEntryCount = EntryCount - SkipEntryCount; + CasLog.Replay( + [&](const CasDiskIndexEntry& Record) { + LogEntryCount++; + std::string InvalidEntryReason; + if (Record.Flags & CasDiskIndexEntry::kTombstone) + { + m_LocationMap.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + return; + } + m_LocationMap[Record.Key] = Record.Location; + }, + SkipEntryCount); + return LogEntryCount; + } + } + return 0; +} + +void +CasContainerStrategy::OpenContainer(bool IsNewStore) +{ + // Add .running file and delete on clean on close to detect bad termination + + m_LocationMap.clear(); + + std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); + + if (IsNewStore) + { + std::filesystem::remove_all(BasePath); + } + + m_LogFlushPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); + + CreateDirectories(BasePath); + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_LocationMap.size()); + for (const auto& Entry : m_LocationMap) + { + const BlockStoreDiskLocation& Location = Entry.second; + KnownLocations.push_back(Location.Get(m_PayloadAlignment)); + } + + m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); + + if (IsNewStore || (LogEntryCount > 0)) + { + MakeIndexSnapshot(); + } + + // TODO: should validate integrity of container files here +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +namespace { + static IoBuffer CreateRandomChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace + +TEST_CASE("compactcas.hex") +{ + uint32_t Value; + std::string HexString; + CHECK(!ParseHexNumber("", Value)); + char Hex[9]; + + ToHexNumber(0u, Hex); + HexString = std::string(Hex); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0u); + + ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex); + HexString = std::string(Hex); + CHECK(HexString == "ffffffff"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == std::numeric_limits<std::uint32_t>::max()); + + ToHexNumber(0xadf14711u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "adf14711"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0xadf14711u); + + ToHexNumber(0x80000000u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "80000000"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0x80000000u); + + ToHexNumber(0x718293a4u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "718293a4"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0x718293a4u); +} + +TEST_CASE("compactcas.compact.gc") +{ + ScopedTemporaryDirectory TempDir; + + const int kIterationCount = 1000; + + std::vector<IoHash> Keys(kIterationCount); + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); + + for (int i = 0; i < kIterationCount; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + const IoHash Hash = HashBuffer(ObjBuffer); + + Cas.InsertChunk(ObjBuffer, Hash); + + Keys[i] = Hash; + } + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + } + + // Validate that we can still read the inserted data after closing + // the original cas store + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + } +} + +TEST_CASE("compactcas.compact.totalsize") +{ + std::random_device rd; + std::mt19937 g(rd()); + + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + const uint64_t kChunkSize = 1024; + const int32_t kChunkCount = 16; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateRandomChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + } + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + + // Re-open again, this time we should have a snapshot + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + } +} + +TEST_CASE("compactcas.gc.basic") +{ + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); + + IoBuffer Chunk = CreateRandomChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(InsertResult.New); + Cas.Flush(); + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("compactcas.gc.removefile") +{ + ScopedTemporaryDirectory TempDir; + + IoBuffer Chunk = CreateRandomChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); + + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(InsertResult.New); + const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(!InsertResultDup.New); + Cas.Flush(); + } + + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false); + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("compactcas.gc.compact") +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true); + + uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(9); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(CreateRandomChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(9); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); + CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); + CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); + CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); + CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); + CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); + CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); + CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); + CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + // Keep first and last + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep last + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep mixed + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[1]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[7]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(!Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[8], ChunkHashes[8]); + } + + // Keep multiple at end + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[7]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + } + + // Keep every other + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[2]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Verify that we nicely appended blocks even after all GC operations + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + } +} + +TEST_CASE("compactcas.gc.deleteblockonopen") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(CreateRandomChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + // GC every other block + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < 20; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } + } + { + // Re-open + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, false); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } +} + +TEST_CASE("compactcas.gc.handleopeniobuffer") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (const uint64_t& Size : ChunkSizes) + { + Chunks.push_back(CreateRandomChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); + Cas.Flush(); + + // GC everything + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i++) + { + CHECK(!Cas.HaveChunk(ChunkHashes[i])); + } + + CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); +} + +TEST_CASE("compactcas.threadedinsert") +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 4096; + uint64_t ExpectedSize = 0; + + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; + Chunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + while (true) + { + IoBuffer Chunk = CreateRandomChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = Chunk; + ExpectedSize += Chunk.Size(); + break; + } + } + + std::atomic<size_t> WorkCompleted = 0; + WorkerThreadPool ThreadPool(4); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); + { + for (const auto& Chunk : Chunks) + { + const IoHash& Hash = Chunk.first; + const IoBuffer& Buffer = Chunk.second; + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); + ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + + WorkCompleted = 0; + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_LE(ExpectedSize, TotalSize); + CHECK_GE(ExpectedSize + 32768, TotalSize); + + { + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + + std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes; + GcChunkHashes.reserve(Chunks.size()); + for (const auto& Chunk : Chunks) + { + GcChunkHashes.insert(Chunk.first); + } + { + WorkCompleted = 0; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; + NewChunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateRandomChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = Chunk; + } + + std::atomic_uint32_t AddedChunkCount; + + for (const auto& Chunk : NewChunks) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + Cas.InsertChunk(Chunk.second, Chunk.first); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }); + } + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + if (Buffer) + { + CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); + } + WorkCompleted.fetch_add(1); + }); + } + + while (AddedChunkCount.load() < NewChunks.size()) + { + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + if (Cas.HaveChunk(Chunk.first)) + { + GcChunkHashes.emplace(Chunk.first); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + GcCtx.AddRetainedCids(KeepHashes); + Cas.CollectGarbage(GcCtx); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (WorkCompleted < NewChunks.size() + Chunks.size()) + { + Sleep(1); + } + + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + if (Cas.HaveChunk(Chunk.first)) + { + GcChunkHashes.emplace(Chunk.first); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + GcCtx.AddRetainedCids(KeepHashes); + Cas.CollectGarbage(GcCtx); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + { + WorkCompleted = 0; + for (const IoHash& ChunkHash : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < GcChunkHashes.size()) + { + Sleep(1); + } + } + } +} + +#endif + +void +compactcas_forcelink() +{ +} + +} // namespace zen diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h new file mode 100644 index 000000000..b0c6699eb --- /dev/null +++ b/src/zenstore/compactcas.h @@ -0,0 +1,95 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> +#include <zenstore/blockstore.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> + +#include "cas.h" + +#include <atomic> +#include <limits> +#include <unordered_map> + +namespace spdlog { +class logger; +} + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +#pragma pack(push) +#pragma pack(1) + +struct CasDiskIndexEntry +{ + static const uint8_t kTombstone = 0x01; + + IoHash Key; + BlockStoreDiskLocation Location; + ZenContentType ContentType = ZenContentType::kUnknownContentType; + uint8_t Flags = 0; +}; + +#pragma pack(pop) + +static_assert(sizeof(CasDiskIndexEntry) == 32); + +/** This implements a storage strategy for small CAS values + * + * New chunks are simply appended to a small object file, and an index is + * maintained to allow chunks to be looked up within the active small object + * files + * + */ + +struct CasContainerStrategy final : public GcStorage +{ + CasContainerStrategy(GcManager& Gc); + ~CasContainerStrategy(); + + CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); + IoBuffer FindChunk(const IoHash& ChunkHash); + bool HaveChunk(const IoHash& ChunkHash); + void FilterChunks(HashKeySet& InOutChunks); + void Initialize(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + uint32_t MaxBlockSize, + uint64_t Alignment, + bool IsNewStore); + void Flush(); + void Scrub(ScrubContext& Ctx); + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_BlockStore.TotalSize()}; } + +private: + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(); + uint64_t ReadLog(uint64_t SkipEntryCount); + void OpenContainer(bool IsNewStore); + + spdlog::logger& Log() { return m_Log; } + + std::filesystem::path m_RootDirectory; + spdlog::logger& m_Log; + uint64_t m_PayloadAlignment = 1u << 4; + uint64_t m_MaxBlockSize = 1u << 28; + bool m_IsInitialized = false; + TCasLogFile<CasDiskIndexEntry> m_CasLog; + uint64_t m_LogFlushPosition = 0; + std::string m_ContainerBaseName; + std::filesystem::path m_BlocksBasePath; + BlockStore m_BlockStore; + + RwLock m_LocationMapLock; + typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t; + LocationMap_t m_LocationMap; +}; + +void compactcas_forcelink(); + +} // namespace zen diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp new file mode 100644 index 000000000..1d25920c4 --- /dev/null +++ b/src/zenstore/filecas.cpp @@ -0,0 +1,1452 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "filecas.h" + +#include <zencore/compress.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/scopeguard.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zencore/uid.h> +#include <zenstore/gc.h> +#include <zenstore/scrubcontext.h> +#include <zenutil/basicfile.h> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +#endif + +#include <gsl/gsl-lite.hpp> + +#include <barrier> +#include <filesystem> +#include <functional> +#include <unordered_map> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <xxhash.h> +#if ZEN_PLATFORM_WINDOWS +# include <atlfile.h> +#endif +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +namespace filecas::impl { + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".ulog"; + + std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir) + { + return RootDir / fmt::format("cas.tmp{}", IndexExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); } + +#pragma pack(push) +#pragma pack(1) + + struct FileCasIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t Reserved = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const FileCasIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(FileCasIndexHeader) == 32); + +#pragma pack(pop) + +} // namespace filecas::impl + +FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) +{ + ShardedPath.Append(RootPath.c_str()); + + ExtendableStringBuilder<64> HashString; + ChunkHash.ToHexString(HashString); + + const char* str = HashString.c_str(); + + // Shard into a path with two directory levels containing 12 bits and 8 bits + // respectively. + // + // This results in a maximum of 4096 * 256 directories + // + // The numbers have been chosen somewhat arbitrarily but are large to scale + // to very large chunk repositories without creating too many directories + // on a single level since NTFS does not deal very well with this. + // + // It may or may not make sense to make this a configurable policy, and it + // would probably be a good idea to measure performance for different + // policies and chunk counts + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str, str + 3); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 3, str + 5); + Shard2len = ShardedPath.Size(); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 5, str + 40); +} + +////////////////////////////////////////////////////////////////////////// + +FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas")) +{ +} + +FileCasStrategy::~FileCasStrategy() +{ +} + +void +FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) +{ + using namespace filecas::impl; + + m_IsInitialized = true; + + m_RootDirectory = RootDirectory; + + m_Index.clear(); + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); + + if (IsNewStore) + { + std::filesystem::remove(LogPath); + std::filesystem::remove(IndexPath); + + if (std::filesystem::is_directory(m_RootDirectory)) + { + // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside + // in this folder as well + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override + { + // We don't care about files + } + static bool IsHexChar(std::filesystem::path::value_type C) + { + return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16]; + } + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& DirectoryName) override + { + if (DirectoryName.length() == 3) + { + if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2])) + { + ShardedRoots.push_back(Parent / DirectoryName); + } + } + return false; + } + std::vector<std::filesystem::path> ShardedRoots; + } CasVisitor; + + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); + for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots) + { + std::filesystem::remove_all(SharededRoot); + } + } + } + + m_LogFlushPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); + for (const auto& Entry : m_Index) + { + m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed); + } + + CreateDirectories(m_RootDirectory); + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + if (IsNewStore || LogEntryCount > 0) + { + MakeIndexSnapshot(); + } +} + +#if ZEN_PLATFORM_WINDOWS +static void +DeletePayloadFileOnClose(const void* FileHandle) +{ + const HANDLE WinFileHandle = (const HANDLE)FileHandle; + // This will cause the file to be deleted when the last handle to it is closed + FILE_DISPOSITION_INFO Fdi{}; + Fdi.DeleteFile = TRUE; + BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); + + if (!Success) + { + // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed + // to delete it. + ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'", + PathFromHandle(WinFileHandle), + GetLastErrorAsString()); + } +} +#endif + +CasStore::InsertResult +FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) +{ + ZEN_ASSERT(m_IsInitialized); + +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + + if (Mode == CasStore::InsertMode::kCopyOnly) + { + { + RwLock::SharedLockScope _(m_Lock); + if (m_Index.contains(ChunkHash)) + { + return CasStore::InsertResult{.New = false}; + } + } + return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); + } + + // File-based chunks have special case handling whereby we move the file into + // place in the file store directory, thus avoiding unnecessary copying + + IoBufferFileReference FileRef; + if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) + { + { + bool Exists = true; + { + RwLock::SharedLockScope _(m_Lock); + Exists = m_Index.contains(ChunkHash); + } + if (Exists) + { +#if ZEN_PLATFORM_WINDOWS + DeletePayloadFileOnClose(FileRef.FileHandle); +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle); + if (unlink(FilePath.c_str()) < 0) + { + int UnlinkError = zen::GetLastError(); + if (UnlinkError != ENOENT) + { + ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", + FilePath.string(), + GetSystemErrorAsString(UnlinkError)); + } + } +#endif + return CasStore::InsertResult{.New = false}; + } + } + + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); + +#if ZEN_PLATFORM_WINDOWS + const HANDLE ChunkFileHandle = FileRef.FileHandle; + // See if file already exists + { + CAtlFile PayloadFile; + + if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) + { + // If we succeeded in opening the target file then we don't need to do anything else because it already exists + // and should contain the content we were about to insert + + // We do need to ensure the source file goes away on close, however + size_t ChunkSize = Chunk.GetSize(); + uint64_t FileSize = 0; + if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) + { + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + + DeletePayloadFileOnClose(ChunkFileHandle); + + return CasStore::InsertResult{.New = IsNew}; + } + else + { + ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", + Name.ShardedPath.ToUtf8(), + ChunkSize, + FileSize); + } + } + else + { + if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) + { + // Shard directory does not exist + } + else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) + { + // Shard directory exists, but not the file + } + else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION)) + { + // Sharing violation, likely because we are trying to open a file + // which has been renamed on another thread, and the file handle + // used to rename it is still open. We handle this case below + // instead of here + } + else + { + ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes); + } + } + } + + std::filesystem::path FullPath(Name.ShardedPath.c_str()); + + std::filesystem::path FilePath = FullPath.parent_path(); + std::wstring FileName = FullPath.native(); + + const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR)); + FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize)); + memset(RenameInfo, 0, BufferSize); + + RenameInfo->ReplaceIfExists = FALSE; + RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size()); + memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR)); + RenameInfo->FileName[FileName.size()] = 0; + + auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); + + // Try to move file into place + BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); + + if (!Success) + { + // The rename/move could fail because the target directory does not yet exist. This code attempts + // to create it + + CAtlFile DirHandle; + + auto InternalCreateDirectoryHandle = [&] { + return DirHandle.Create(FilePath.c_str(), + GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS); + }; + + // It's possible for several threads to enter this logic trying to create the same + // directory. Only one will create the directory of course, but all threads will + // make it through okay + + HRESULT hRes = InternalCreateDirectoryHandle(); + + if (FAILED(hRes)) + { + // TODO: we can handle directory creation more intelligently and efficiently than + // this currently does + + CreateDirectories(FilePath.c_str()); + + hRes = InternalCreateDirectoryHandle(); + } + + if (FAILED(hRes)) + { + ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath)); + } + + // Retry rename/move + + Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); + } + + if (Success) + { + m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + + return CasStore::InsertResult{.New = IsNew}; + } + + const DWORD LastError = GetLastError(); + + if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) + { + HashLock.ReleaseNow(); + DeletePayloadFileOnClose(ChunkFileHandle); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + + return CasStore::InsertResult{.New = IsNew}; + } + + ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", + GetSystemErrorAsString(LastError), + ChunkHash); + + DeletePayloadFileOnClose(ChunkFileHandle); + +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); + std::filesystem::path DestPath = Name.ShardedPath.c_str(); + int Ret = link(SourcePath.c_str(), DestPath.c_str()); + if (Ret < 0 && zen::GetLastError() == ENOENT) + { + // Destination directory doesn't exist. Create it any try again. + CreateDirectories(DestPath.parent_path().c_str()); + Ret = link(SourcePath.c_str(), DestPath.c_str()); + } + int LinkError = zen::GetLastError(); + + if (unlink(SourcePath.c_str()) < 0) + { + int UnlinkError = zen::GetLastError(); + if (UnlinkError != ENOENT) + { + ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", + SourcePath.string(), + GetSystemErrorAsString(UnlinkError)); + } + } + + // It is possible that someone beat us to it in linking the file. In that + // case a "file exists" error is okay. All others are not. + if (Ret < 0) + { + if (LinkError == EEXIST) + { + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } + + ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}", + GetSystemErrorAsString(LinkError), + ChunkHash); + } + else + { + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } +#endif // ZEN_PLATFORM_* + } + + return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); +} + +CasStore::InsertResult +FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) +{ + ZEN_ASSERT(m_IsInitialized); + + { + RwLock::SharedLockScope _(m_Lock); + if (m_Index.contains(ChunkHash)) + { + return {.New = false}; + } + } + + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + + // See if file already exists + +#if ZEN_PLATFORM_WINDOWS + CAtlFile PayloadFile; + + HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + + if (SUCCEEDED(hRes)) + { + // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the + // content we were about to insert + + bool IsNew = false; + { + RwLock::ExclusiveLockScope _(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } + + PayloadFile.Close(); +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + if (access(Name.ShardedPath.c_str(), F_OK) == 0) + { + return CasStore::InsertResult{.New = false}; + } +#endif + + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); + +#if ZEN_PLATFORM_WINDOWS + // For now, use double-checked locking to see if someone else was first + + hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + + if (SUCCEEDED(hRes)) + { + uint64_t FileSize = 0; + if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) + { + // If we succeeded in opening the file then and the size is correct we don't need to do anything + // else because someone else managed to create the file before we did. Just return. + + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } + else + { + ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", + Name.ShardedPath.ToUtf8(), + ChunkSize, + FileSize); + } + } + + if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))) + { + ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); + } + + auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; + + hRes = InternalCreateFile(); + + if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) + { + // Ensure parent directories exist and retry file creation + CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); + hRes = InternalCreateFile(); + } + + if (FAILED(hRes)) + { + ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8())); + } +#else + // Attempt to exclusively create the file. + auto InternalCreateFile = [&] { + int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); + if (Fd >= 0) + { + fchmod(Fd, 0666); + } + return Fd; + }; + int Fd = InternalCreateFile(); + if (Fd < 0) + { + switch (zen::GetLastError()) + { + case EEXIST: + // Another thread has beat us to it so we're golden. + { + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return {.New = IsNew}; + } + break; + + case ENOENT: + if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len))) + { + Fd = InternalCreateFile(); + if (Fd >= 0) + { + break; + } + } + ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath)); + + default: + ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8())); + } + } + + struct FdWrapper + { + ~FdWrapper() { Close(); } + void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); } + void Close() + { + if (Fd >= 0) + { + close(Fd); + Fd = -1; + } + } + int Fd; + } PayloadFile = {Fd}; +#endif // ZEN_PLATFORM_WINDOWS + + size_t ChunkRemain = ChunkSize; + auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData); + + while (ChunkRemain != 0) + { + uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain)); + + PayloadFile.Write(ChunkCursor, ByteCount); + + ChunkCursor += ByteCount; + ChunkRemain -= ByteCount; + } + + // We cannot rely on RAII to close the file handle since it would be closed + // *after* the lock is released due to the initialization order + PayloadFile.Close(); + + m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); + + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + + return {.New = IsNew}; +} + +IoBuffer +FileCasStrategy::FindChunk(const IoHash& ChunkHash) +{ + ZEN_ASSERT(m_IsInitialized); + + { + RwLock::SharedLockScope _(m_Lock); + if (!m_Index.contains(ChunkHash)) + { + return {}; + } + } + + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + + RwLock::SharedLockScope _(LockForHash(ChunkHash)); + + return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); +} + +bool +FileCasStrategy::HaveChunk(const IoHash& ChunkHash) +{ + ZEN_ASSERT(m_IsInitialized); + + RwLock::SharedLockScope _(m_Lock); + return m_Index.contains(ChunkHash); +} + +void +FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) +{ + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + + uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); + if (Ec) + { + ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8()); + FileSize = 0; + } + + ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize)); + std::filesystem::remove(Name.ShardedPath.c_str(), Ec); + + if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str())) + { + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) + { + m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); + m_Index.erase(It); + } + } + m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); + } +} + +void +FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) +{ + ZEN_ASSERT(m_IsInitialized); + + // NOTE: it's not a problem now, but in the future if a GC should happen while this + // is in flight, the result could be wrong since chunks could go away in the meantime. + // + // It would be good to have a pinning mechanism to make this less likely but + // given that chunks could go away at any point after the results are returned to + // a caller, this is something which needs to be taken into account by anyone consuming + // this functionality in any case + + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); +} + +void +FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback) +{ + ZEN_ASSERT(m_IsInitialized); + + RwLock::SharedLockScope _(m_Lock); + for (const auto& It : m_Index) + { + const IoHash& NameHash = It.first; + ShardingHelper Name(m_RootDirectory.c_str(), NameHash); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + Callback(NameHash, std::move(Payload)); + } +} + +void +FileCasStrategy::Flush() +{ + // Since we don't keep files open after writing there's nothing specific + // to flush here. + // + // Depending on what semantics we want Flush() to provide, it could be + // argued that this should just flush the volume which we are using to + // store the CAS files on here, to ensure metadata is flushed along + // with file data + // + // Related: to facilitate more targeted validation during recovery we could + // maintain a log of when chunks were created +} + +void +FileCasStrategy::Scrub(ScrubContext& Ctx) +{ + ZEN_ASSERT(m_IsInitialized); + + std::vector<IoHash> BadHashes; + uint64_t ChunkCount{0}, ChunkBytes{0}; + + { + std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); + RwLock::ExclusiveLockScope _(m_Lock); + for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) + { + if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed); + m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); + } + } + } + + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { + if (!Payload) + { + BadHashes.push_back(Hash); + return; + } + ++ChunkCount; + ChunkBytes += Payload.GetSize(); + + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize)) + { + if (RawHash != Hash) + { + // Hash mismatch + BadHashes.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS + IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload)))); + if (ComputedHash == Hash) + { + return; + } +#endif + BadHashes.push_back(Hash); + }); + + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + + if (!BadHashes.empty()) + { + ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size()); + + if (Ctx.RunRecovery()) + { + ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); + + for (const IoHash& Hash : BadHashes) + { + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("failed to delete file for chunk {}", Hash); + } + } + } + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCidChunks(BadHashes); + + ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); +} + +void +FileCasStrategy::CollectGarbage(GcContext& GcCtx) +{ + ZEN_ASSERT(m_IsInitialized); + + ZEN_DEBUG("collecting garbage from {}", m_RootDirectory); + + std::vector<IoHash> ChunksToDelete; + std::atomic<uint64_t> ChunksToDeleteBytes{0}; + std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; + + std::vector<IoHash> CandidateCas; + CandidateCas.resize(1); + + uint64_t DeletedCount = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", + m_RootDirectory, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + DeletedCount, + ChunkCount, + NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)), + NiceBytes(OldTotalSize)); + }); + + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { + bool KeepThis = false; + CandidateCas[0] = Hash; + GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { + ZEN_UNUSED(Hash); + KeepThis = true; + }); + + const uint64_t FileSize = Payload.GetSize(); + + if (!KeepThis) + { + ChunksToDelete.push_back(Hash); + ChunksToDeleteBytes.fetch_add(FileSize); + } + + ++ChunkCount; + ChunkBytes.fetch_add(FileSize); + }); + + // TODO, any entires we did not encounter during our IterateChunks should be removed from the index + + if (ChunksToDelete.empty()) + { + ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); + return; + } + + ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})", + m_RootDirectory, + ChunksToDelete.size(), + ChunkCount.load(), + NiceBytes(ChunksToDeleteBytes)); + + if (GcCtx.IsDeletionMode() == false) + { + ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled"); + + return; + } + + for (const IoHash& Hash : ChunksToDelete) + { + ZEN_TRACE("deleting chunk {}", Hash); + + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message()); + continue; + } + DeletedCount++; + } + + GcCtx.AddDeletedCids(ChunksToDelete); +} + +bool +FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason) +{ + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & (~FileCasIndexEntry::kTombStone)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Size; + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; +} + +void +FileCasStrategy::MakeIndexSnapshot() +{ + using namespace filecas::impl; + + uint64_t LogCount = m_CasLog.GetLogCount(); + if (m_LogFlushPosition == LogCount) + { + return; + } + ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", + m_RootDirectory, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_RootDirectory); + fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory); + + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } + + try + { + // Write the current state of the location map to a new index state + std::vector<FileCasIndexEntry> Entries; + + { + Entries.resize(m_Index.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) + { + FileCasIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Size = Entry.second.Size; + } + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount}; + + Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + m_LogFlushPosition = LogCount; + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} +uint64_t +FileCasStrategy::ReadIndexFile() +{ + using namespace filecas::impl; + + std::vector<FileCasIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", + IndexPath, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(FileCasIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry); + FileCasIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) && + (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount)) + { + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader)); + + std::string InvalidEntryReason; + for (const FileCasIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); + } + + return Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + } + return 0; + } + + if (std::filesystem::is_directory(m_RootDirectory)) + { + ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory); + TCasLogFile<FileCasIndexEntry> CasLog; + uint64_t TotalSize = 0; + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}", + m_RootDirectory, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + CasLog.GetLogCount(), + NiceBytes(TotalSize)); + }); + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + + std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); + CasLog.Open(LogPath, CasLogFile::Mode::kTruncate); + std::string InvalidEntryReason; + for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); + CasLog.Append(Entry); + } + + CasLog.Close(); + } + + return 0; +} + +uint64_t +FileCasStrategy::ReadLog(uint64_t SkipEntryCount) +{ + using namespace filecas::impl; + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + if (std::filesystem::is_regular_file(LogPath)) + { + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<FileCasIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + LogEntryCount = EntryCount - SkipEntryCount; + m_Index.reserve(LogEntryCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const FileCasIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & FileCasIndexEntry::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size}); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath); + } + return LogEntryCount; + } + } + return 0; +} + +std::vector<FileCasStrategy::FileCasIndexEntry> +FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) +{ + using namespace filecas::impl; + + std::vector<FileCasIndexEntry> Entries; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {} + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + { + std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); + + std::filesystem::path::string_type PathString = RelPath.native(); + + if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) + { + if (PathString.at(3) == std::filesystem::path::preferred_separator) + { + PathString.erase(3, 1); + } + PathString.append(File); + + // TODO: should validate that we're actually dealing with a valid hex string here +#if ZEN_PLATFORM_WINDOWS + StringBuilder<64> Utf8; + WideToUtf8(PathString, Utf8); + IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); +#else + IoHash NameHash = IoHash::FromHexString(PathString); +#endif + Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize}); + } + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& DirectoryName) override + { + return true; + } + + const std::filesystem::path& RootDirectory; + std::vector<FileCasIndexEntry>& Entries; + } CasVisitor{RootDir, Entries}; + + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(RootDir, CasVisitor); + return Entries; +}; + + ////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +TEST_CASE("cas.file.move") +{ + // specifying an absolute path here can be helpful when using procmon to dig into things + ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; + + GcManager Gc; + + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); + + { + std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; + + IoBuffer ZeroBytes{1024 * 1024}; + IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes); + + BasicFile PayloadFile; + PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate); + PayloadFile.Write(ZeroBytes, 0); + PayloadFile.Close(); + + IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); + + CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash); + CHECK_EQ(Result.New, true); + } + +# if 0 + SUBCASE("stresstest") + { + std::vector<IoHash> PayloadHashes; + + const int kWorkers = 64; + const int kItemCount = 128; + + for (int w = 0; w < kWorkers; ++w) + { + for (int i = 0; i < kItemCount; ++i) + { + IoBuffer Payload{1024}; + *reinterpret_cast<int*>(Payload.MutableData()) = i; + PayloadHashes.push_back(IoHash::HashBuffer(Payload)); + + std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; + WriteFile(PayloadPath, Payload); + } + } + + std::barrier Sync{kWorkers}; + + auto PopulateAll = [&](int w) { + std::vector<IoBuffer> Buffers; + + for (int i = 0; i < kItemCount; ++i) + { + std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; + IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath); + Buffers.push_back(Payload); + Sync.arrive_and_wait(); + CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]); + } + }; + + std::vector<std::jthread> Threads; + + for (int i = 0; i < kWorkers; ++i) + { + Threads.push_back(std::jthread(PopulateAll, i)); + } + + for (std::jthread& Thread : Threads) + { + Thread.join(); + } + } +# endif +} + +TEST_CASE("cas.file.gc") +{ + // specifying an absolute path here can be helpful when using procmon to dig into things + ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; + + GcManager Gc; + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); + + const int kIterationCount = 1000; + std::vector<IoHash> Keys{kIterationCount}; + + auto InsertChunks = [&] { + for (int i = 0; i < kIterationCount; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + IoHash Hash = HashBuffer(ObjBuffer); + + FileCas.InsertChunk(ObjBuffer, Hash); + + Keys[i] = Hash; + } + }; + + // Drop everything + + { + InsertChunks(); + + GcContext Ctx(GcClock::Now() - std::chrono::hours(24)); + FileCas.CollectGarbage(Ctx); + + for (const IoHash& Key : Keys) + { + IoBuffer Chunk = FileCas.FindChunk(Key); + + CHECK(!Chunk); + } + } + + // Keep roughly half of the chunks + + { + InsertChunks(); + + GcContext Ctx(GcClock::Now() - std::chrono::hours(24)); + + for (const IoHash& Key : Keys) + { + if (Key.Hash[0] & 1) + { + Ctx.AddRetainedCids(std::vector<IoHash>{Key}); + } + } + + FileCas.CollectGarbage(Ctx); + + for (const IoHash& Key : Keys) + { + if (Key.Hash[0] & 1) + { + CHECK(FileCas.FindChunk(Key)); + } + else + { + CHECK(!FileCas.FindChunk(Key)); + } + } + } +} + +#endif + +void +filecas_forcelink() +{ +} + +} // namespace zen diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h new file mode 100644 index 000000000..420b3a634 --- /dev/null +++ b/src/zenstore/filecas.h @@ -0,0 +1,102 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <zencore/filesystem.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/thread.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> + +#include "cas.h" + +#include <atomic> +#include <functional> + +namespace spdlog { +class logger; +} + +namespace zen { + +class BasicFile; + +/** CAS storage strategy using a file-per-chunk storage strategy + */ + +struct FileCasStrategy final : public GcStorage +{ + FileCasStrategy(GcManager& Gc); + ~FileCasStrategy(); + + void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore); + CasStore::InsertResult InsertChunk(IoBuffer Chunk, + const IoHash& ChunkHash, + CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace); + IoBuffer FindChunk(const IoHash& ChunkHash); + bool HaveChunk(const IoHash& ChunkHash); + void FilterChunks(HashKeySet& InOutChunks); + void Flush(); + void Scrub(ScrubContext& Ctx); + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } + +private: + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(); + uint64_t ReadLog(uint64_t LogPosition); + + struct IndexEntry + { + uint64_t Size = 0; + }; + using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>; + + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); + + std::filesystem::path m_RootDirectory; + RwLock m_Lock; + IndexMap m_Index; + RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } + std::atomic_uint64_t m_TotalSize{}; + bool m_IsInitialized = false; + + struct FileCasIndexEntry + { + static const uint32_t kTombStone = 0x0000'0001; + + bool IsFlagSet(const uint32_t Flag) const { return (Flags & kTombStone) == Flag; } + + IoHash Key; + uint32_t Flags = 0; + uint64_t Size = 0; + }; + static bool ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason); + static std::vector<FileCasStrategy::FileCasIndexEntry> ScanFolderForCasFiles(const std::filesystem::path& RootDir); + + static_assert(sizeof(FileCasIndexEntry) == 32); + + TCasLogFile<FileCasIndexEntry> m_CasLog; + uint64_t m_LogFlushPosition = 0; + + inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } + void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback); + void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec); + + struct ShardingHelper + { + ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash); + + size_t Shard2len = 0; + ExtendablePathBuilder<128> ShardedPath; + }; +}; + +void filecas_forcelink(); + +} // namespace zen diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp new file mode 100644 index 000000000..370c3c965 --- /dev/null +++ b/src/zenstore/gc.cpp @@ -0,0 +1,1312 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/gc.h> + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> +#include <zencore/timer.h> +#include <zenstore/cidstore.h> + +#include "cas.h" + +#include <fmt/format.h> +#include <filesystem> + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#else +# include <fcntl.h> +# include <sys/file.h> +# include <sys/stat.h> +# include <unistd.h> +#endif + +#if ZEN_WITH_TESTS +# include <zencore/compress.h> +# include <algorithm> +# include <random> +#endif + +template<> +struct fmt::formatter<zen::GcClock::TimePoint> : formatter<string_view> +{ + template<typename FormatContext> + auto format(const zen::GcClock::TimePoint& TimePoint, FormatContext& ctx) + { + std::time_t Time = std::chrono::system_clock::to_time_t(TimePoint); + zen::ExtendableStringBuilder<32> String; + String << std::ctime(&Time); + return formatter<string_view>::format(String.ToView(), ctx); + } +}; + +namespace zen { + +using namespace std::literals; +namespace fs = std::filesystem; + +////////////////////////////////////////////////////////////////////////// + +namespace { + std::error_code CreateGCReserve(const std::filesystem::path& Path, uint64_t Size) + { + if (Size == 0) + { + std::filesystem::remove(Path); + return std::error_code{}; + } + CreateDirectories(Path.parent_path()); + if (std::filesystem::is_regular_file(Path) && std::filesystem::file_size(Path) == Size) + { + return std::error_code(); + } +#if ZEN_PLATFORM_WINDOWS + DWORD dwCreationDisposition = CREATE_ALWAYS; + DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; + + const DWORD dwShareMode = 0; + const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; + HANDLE hTemplateFile = nullptr; + + HANDLE FileHandle = CreateFile(Path.c_str(), + dwDesiredAccess, + dwShareMode, + /* lpSecurityAttributes */ nullptr, + dwCreationDisposition, + dwFlagsAndAttributes, + hTemplateFile); + + if (FileHandle == INVALID_HANDLE_VALUE) + { + return MakeErrorCodeFromLastError(); + } + bool Keep = true; + auto _ = MakeGuard([&]() { + ::CloseHandle(FileHandle); + if (!Keep) + { + ::DeleteFile(Path.c_str()); + } + }); + LARGE_INTEGER liFileSize; + liFileSize.QuadPart = Size; + BOOL OK = ::SetFilePointerEx(FileHandle, liFileSize, 0, FILE_BEGIN); + if (!OK) + { + return MakeErrorCodeFromLastError(); + } + OK = ::SetEndOfFile(FileHandle); + if (!OK) + { + return MakeErrorCodeFromLastError(); + } + Keep = true; +#else + int OpenFlags = O_CLOEXEC | O_RDWR | O_CREAT; + int Fd = open(Path.c_str(), OpenFlags, 0666); + if (Fd < 0) + { + return MakeErrorCodeFromLastError(); + } + + bool Keep = true; + auto _ = MakeGuard([&]() { + close(Fd); + if (!Keep) + { + unlink(Path.c_str()); + } + }); + + if (fchmod(Fd, 0666) < 0) + { + return MakeErrorCodeFromLastError(); + } + +# if ZEN_PLATFORM_MAC + if (ftruncate(Fd, (off_t)Size) < 0) + { + return MakeErrorCodeFromLastError(); + } +# else + if (ftruncate64(Fd, (off64_t)Size) < 0) + { + return MakeErrorCodeFromLastError(); + } + int Error = posix_fallocate64(Fd, 0, (off64_t)Size); + if (Error) + { + return MakeErrorCode(Error); + } +# endif + Keep = true; +#endif + return std::error_code{}; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + +CbObject +LoadCompactBinaryObject(const fs::path& Path) +{ + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); +} + +void +SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) +{ + WriteFile(Path, Object.GetBuffer().AsIoBuffer()); +} + +////////////////////////////////////////////////////////////////////////// + +struct GcContext::GcState +{ + using CacheKeyContexts = std::unordered_map<std::string, std::vector<IoHash>>; + + CacheKeyContexts m_ExpiredCacheKeys; + HashKeySet m_RetainedCids; + HashKeySet m_DeletedCids; + GcClock::TimePoint m_ExpireTime; + bool m_DeletionMode = true; + bool m_CollectSmallObjects = false; + + std::filesystem::path DiskReservePath; +}; + +GcContext::GcContext(const GcClock::TimePoint& ExpireTime) : m_State(std::make_unique<GcState>()) +{ + m_State->m_ExpireTime = ExpireTime; +} + +GcContext::~GcContext() +{ +} + +void +GcContext::AddRetainedCids(std::span<const IoHash> Cids) +{ + m_State->m_RetainedCids.AddHashesToSet(Cids); +} + +void +GcContext::SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys) +{ + m_State->m_ExpiredCacheKeys[CacheKeyContext] = std::move(ExpiredKeys); +} + +void +GcContext::IterateCids(std::function<void(const IoHash&)> Callback) +{ + m_State->m_RetainedCids.IterateHashes([&](const IoHash& Hash) { Callback(Hash); }); +} + +void +GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc) +{ + m_State->m_RetainedCids.FilterHashes(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); +} + +void +GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc) +{ + m_State->m_RetainedCids.FilterHashes(Cid, std::move(FilterFunc)); +} + +void +GcContext::AddDeletedCids(std::span<const IoHash> Cas) +{ + m_State->m_DeletedCids.AddHashesToSet(Cas); +} + +const HashKeySet& +GcContext::DeletedCids() +{ + return m_State->m_DeletedCids; +} + +std::span<const IoHash> +GcContext::ExpiredCacheKeys(const std::string& CacheKeyContext) const +{ + return m_State->m_ExpiredCacheKeys[CacheKeyContext]; +} + +bool +GcContext::IsDeletionMode() const +{ + return m_State->m_DeletionMode; +} + +void +GcContext::SetDeletionMode(bool NewState) +{ + m_State->m_DeletionMode = NewState; +} + +bool +GcContext::CollectSmallObjects() const +{ + return m_State->m_CollectSmallObjects; +} + +void +GcContext::CollectSmallObjects(bool NewState) +{ + m_State->m_CollectSmallObjects = NewState; +} + +GcClock::TimePoint +GcContext::ExpireTime() const +{ + return m_State->m_ExpireTime; +} + +void +GcContext::DiskReservePath(const std::filesystem::path& Path) +{ + m_State->DiskReservePath = Path; +} + +uint64_t +GcContext::ClaimGCReserve() +{ + if (!std::filesystem::is_regular_file(m_State->DiskReservePath)) + { + return 0; + } + uint64_t ReclaimedSize = std::filesystem::file_size(m_State->DiskReservePath); + if (std::filesystem::remove(m_State->DiskReservePath)) + { + return ReclaimedSize; + } + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +GcContributor::GcContributor(GcManager& Gc) : m_Gc(Gc) +{ + m_Gc.AddGcContributor(this); +} + +GcContributor::~GcContributor() +{ + m_Gc.RemoveGcContributor(this); +} + +////////////////////////////////////////////////////////////////////////// + +GcStorage::GcStorage(GcManager& Gc) : m_Gc(Gc) +{ + m_Gc.AddGcStorage(this); +} + +GcStorage::~GcStorage() +{ + m_Gc.RemoveGcStorage(this); +} + +////////////////////////////////////////////////////////////////////////// + +GcManager::GcManager() : m_Log(logging::Get("gc")) +{ +} + +GcManager::~GcManager() +{ +} + +void +GcManager::AddGcContributor(GcContributor* Contributor) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcContribs.push_back(Contributor); +} + +void +GcManager::RemoveGcContributor(GcContributor* Contributor) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; }); +} + +void +GcManager::AddGcStorage(GcStorage* Storage) +{ + ZEN_ASSERT(Storage != nullptr); + RwLock::ExclusiveLockScope _(m_Lock); + m_GcStorage.push_back(Storage); +} + +void +GcManager::RemoveGcStorage(GcStorage* Storage) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; }); +} + +void +GcManager::CollectGarbage(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + // First gather reference set + { + Stopwatch Timer; + const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + for (GcContributor* Contributor : m_GcContribs) + { + Contributor->GatherReferences(GcCtx); + } + } + + // Then trim storage + { + GcStorageSize GCTotalSizeDiff; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_INFO("collected garbage in {}. Removed {} disk space, {} memory", + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + NiceBytes(GCTotalSizeDiff.DiskSize), + NiceBytes(GCTotalSizeDiff.MemorySize)); + }); + for (GcStorage* Storage : m_GcStorage) + { + const auto PreSize = Storage->StorageSize(); + Storage->CollectGarbage(GcCtx); + const auto PostSize = Storage->StorageSize(); + GCTotalSizeDiff.DiskSize += PreSize.DiskSize > PostSize.DiskSize ? PreSize.DiskSize - PostSize.DiskSize : 0; + GCTotalSizeDiff.MemorySize += PreSize.MemorySize > PostSize.MemorySize ? PreSize.MemorySize - PostSize.MemorySize : 0; + } + } +} + +GcStorageSize +GcManager::TotalStorageSize() const +{ + RwLock::SharedLockScope _(m_Lock); + + GcStorageSize TotalSize; + + for (GcStorage* Storage : m_GcStorage) + { + const auto Size = Storage->StorageSize(); + TotalSize.DiskSize += Size.DiskSize; + TotalSize.MemorySize += Size.MemorySize; + } + + return TotalSize; +} + +#if ZEN_USE_REF_TRACKING +void +GcManager::OnNewCidReferences(std::span<IoHash> Hashes) +{ + ZEN_UNUSED(Hashes); +} + +void +GcManager::OnCommittedCidReferences(std::span<IoHash> Hashes) +{ + ZEN_UNUSED(Hashes); +} + +void +GcManager::OnDroppedCidReferences(std::span<IoHash> Hashes) +{ + ZEN_UNUSED(Hashes); +} +#endif + +////////////////////////////////////////////////////////////////////////// +void +DiskUsageWindow::KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick) +{ + auto It = m_LogWindow.begin(); + if (It == m_LogWindow.end()) + { + return; + } + while (It->SampleTime < StartTick) + { + ++It; + if (It == m_LogWindow.end()) + { + m_LogWindow.clear(); + return; + } + } + m_LogWindow.erase(m_LogWindow.begin(), It); + + It = m_LogWindow.begin(); + while (It != m_LogWindow.end()) + { + if (It->SampleTime >= EndTick) + { + m_LogWindow.erase(It, m_LogWindow.end()); + return; + } + It++; + } +} + +std::vector<uint64_t> +DiskUsageWindow::GetDiskDeltas(GcClock::Tick StartTick, GcClock::Tick EndTick, GcClock::Tick DeltaWidth, uint64_t& OutMaxDelta) const +{ + ZEN_ASSERT(StartTick != -1); + ZEN_ASSERT(DeltaWidth > 0); + + std::vector<uint64_t> Result; + Result.reserve((EndTick - StartTick + DeltaWidth - 1) / DeltaWidth); + + size_t WindowSize = m_LogWindow.size(); + GcClock::Tick FirstWindowTick = WindowSize < 2 ? EndTick : m_LogWindow[1].SampleTime; + + GcClock::Tick RangeStart = StartTick; + while (FirstWindowTick >= RangeStart + DeltaWidth && RangeStart < EndTick) + { + Result.push_back(0); + RangeStart += DeltaWidth; + } + + uint64_t DeltaSum = 0; + size_t WindowIndex = 1; + while (WindowIndex < WindowSize && RangeStart < EndTick) + { + const DiskUsageEntry& Entry = m_LogWindow[WindowIndex]; + if (Entry.SampleTime < RangeStart) + { + ++WindowIndex; + continue; + } + GcClock::Tick RangeEnd = Min(EndTick, RangeStart + DeltaWidth); + ZEN_ASSERT(Entry.SampleTime >= RangeStart); + if (Entry.SampleTime >= RangeEnd) + { + Result.push_back(DeltaSum); + OutMaxDelta = Max(DeltaSum, OutMaxDelta); + DeltaSum = 0; + RangeStart = RangeEnd; + continue; + } + const DiskUsageEntry& PrevEntry = m_LogWindow[WindowIndex - 1]; + if (Entry.DiskUsage > PrevEntry.DiskUsage) + { + uint64_t Delta = Entry.DiskUsage - PrevEntry.DiskUsage; + DeltaSum += Delta; + } + WindowIndex++; + } + + while (RangeStart < EndTick) + { + Result.push_back(DeltaSum); + OutMaxDelta = Max(DeltaSum, OutMaxDelta); + DeltaSum = 0; + RangeStart += DeltaWidth; + } + return Result; +} + +GcClock::Tick +DiskUsageWindow::FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const +{ + ZEN_ASSERT(Amount > 0); + uint64_t RemainingToFind = Amount; + size_t Offset = 1; + while (Offset < m_LogWindow.size()) + { + const DiskUsageEntry& Entry = m_LogWindow[Offset]; + if (Entry.SampleTime >= EndTick) + { + return EndTick; + } + const DiskUsageEntry& PreviousEntry = m_LogWindow[Offset - 1]; + uint64_t Delta = Entry.DiskUsage > PreviousEntry.DiskUsage ? Entry.DiskUsage - PreviousEntry.DiskUsage : 0; + if (Delta >= RemainingToFind) + { + return m_LogWindow[Offset].SampleTime + 1; + } + RemainingToFind -= Delta; + Offset++; + } + return EndTick; +} + +////////////////////////////////////////////////////////////////////////// + +GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager) +{ +} + +GcScheduler::~GcScheduler() +{ + Shutdown(); +} + +void +GcScheduler::Initialize(const GcSchedulerConfig& Config) +{ + using namespace std::chrono; + + m_Config = Config; + + if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval) + { + m_Config.Interval = m_Config.MonitorInterval; + } + + std::filesystem::create_directories(Config.RootDirectory); + + std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); + if (Ec) + { + ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'", + m_Config.RootDirectory / "reserve.gc", + NiceBytes(m_Config.DiskReserveSize), + Ec.message()); + } + + m_LastGcTime = GcClock::Now(); + m_LastGcExpireTime = GcClock::TimePoint::min(); + + if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state")) + { + m_LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64())); + m_LastGcExpireTime = + GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcExpireTime"].AsInt64(GcClock::Duration::min().count()))); + if (m_LastGcTime + m_Config.Interval < GcClock::Now()) + { + // TODO: Trigger GC? + m_LastGcTime = GcClock::Now(); + } + } + + m_DiskUsageLog.Open(m_Config.RootDirectory / "gc.dlog", CasLogFile::Mode::kWrite); + m_DiskUsageLog.Initialize(); + const GcClock::Tick LastGCTick = m_LastGcTime.time_since_epoch().count(); + m_DiskUsageLog.Replay( + [this, LastGCTick](const DiskUsageWindow::DiskUsageEntry& Entry) { + if (Entry.SampleTime >= m_LastGcExpireTime.time_since_epoch().count()) + { + m_DiskUsageWindow.Append(Entry); + } + }, + 0); + + m_NextGcTime = NextGcTime(m_LastGcTime); + m_GcThread = std::thread(&GcScheduler::SchedulerThread, this); +} + +void +GcScheduler::Shutdown() +{ + if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status) + { + bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning); + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped); + m_GcSignal.notify_one(); + + if (m_GcThread.joinable()) + { + if (GcIsRunning) + { + ZEN_INFO("Waiting for garbage collection to complete"); + } + m_GcThread.join(); + } + } + m_DiskUsageLog.Flush(); + m_DiskUsageLog.Close(); +} + +bool +GcScheduler::Trigger(const GcScheduler::TriggerParams& Params) +{ + if (m_Config.Enabled) + { + std::unique_lock Lock(m_GcMutex); + if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + { + m_TriggerParams = Params; + uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); + if (m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning))) + { + m_GcSignal.notify_one(); + return true; + } + } + } + + return false; +} + +void +GcScheduler::SchedulerThread() +{ + std::chrono::seconds WaitTime{0}; + + for (;;) + { + bool Timeout = false; + { + ZEN_ASSERT(WaitTime.count() >= 0); + std::unique_lock Lock(m_GcMutex); + Timeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, WaitTime); + } + + if (Status() == GcSchedulerStatus::kStopped) + { + break; + } + + if (!m_Config.Enabled) + { + WaitTime = std::chrono::seconds::max(); + continue; + } + + if (!Timeout && Status() == GcSchedulerStatus::kIdle) + { + continue; + } + + bool Delete = true; + bool CollectSmallObjects = m_Config.CollectSmallObjects; + std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration; + uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; + GcClock::TimePoint Now = GcClock::Now(); + if (m_TriggerParams) + { + const auto TriggerParams = m_TriggerParams.value(); + m_TriggerParams.reset(); + + CollectSmallObjects = TriggerParams.CollectSmallObjects; + if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max()) + { + MaxCacheDuration = TriggerParams.MaxCacheDuration; + } + if (TriggerParams.DiskSizeSoftLimit != 0) + { + DiskSizeSoftLimit = TriggerParams.DiskSizeSoftLimit; + } + } + + GcClock::TimePoint ExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; + + std::error_code Ec; + const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); + + if (Timeout && Status() == GcSchedulerStatus::kIdle) + { + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); + if (Ec) + { + ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message()); + } + + const int64_t PressureGraphLength = 30; + const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval; + std::vector<uint64_t> DiskDeltas; + uint64_t MaxLoad = 0; + { + const GcClock::Tick EpochTickCount = GcClock::Now().time_since_epoch().count(); + std::unique_lock Lock(m_GcMutex); + m_DiskUsageWindow.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize}); + m_DiskUsageLog.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize}); + const GcClock::TimePoint LoadGraphStartTime = Now - LoadGraphTime; + GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count(); + GcClock::Tick End = Now.time_since_epoch().count(); + DiskDeltas = m_DiskUsageWindow.GetDiskDeltas(Start, + End, + Max(1, (End - Start + PressureGraphLength - 1) / PressureGraphLength), + MaxLoad); + } + + std::string LoadGraph; + LoadGraph.resize(DiskDeltas.size(), '0'); + if (DiskDeltas.size() > 0 && MaxLoad > 0) + { + char LoadIndicator[11] = "0123456789"; + for (size_t Index = 0; Index < DiskDeltas.size(); ++Index) + { + size_t LoadIndex = (9 * DiskDeltas[Index] + MaxLoad - 1) / MaxLoad; + LoadGraph[Index] = LoadIndicator[LoadIndex]; + } + } + + uint64_t GcDiskSpaceGoal = 0; + if (DiskSizeSoftLimit != 0 && TotalSize.DiskSize > DiskSizeSoftLimit) + { + GcDiskSpaceGoal = TotalSize.DiskSize - DiskSizeSoftLimit; + std::unique_lock Lock(m_GcMutex); + GcClock::Tick AgeTick = m_DiskUsageWindow.FindTimepointThatRemoves(GcDiskSpaceGoal, Now.time_since_epoch().count()); + GcClock::TimePoint SizeBasedExpireTime = GcClock::TimePointFromTick(AgeTick); + if (SizeBasedExpireTime > ExpireTime) + { + ExpireTime = SizeBasedExpireTime; + } + } + + bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0; + + std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now()); + + if (RemaingTime < std::chrono::seconds::zero()) + { + RemaingTime = std::chrono::seconds::zero(); + } + + bool TimeBasedGCTriggered = !DiskSpaceGCTriggered && RemaingTime.count() == 0; + ZEN_INFO( + "{} in use,{} {} of total {} free disk space, disk writes last {} per {} [{}], peak {}/s. {}", + NiceBytes(TotalSize.DiskSize), + DiskSizeSoftLimit == 0 ? "" : fmt::format(" {} soft limit,", NiceBytes(DiskSizeSoftLimit)), + NiceBytes(Space.Free), + NiceBytes(Space.Total), + NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count())), + NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count() / PressureGraphLength)), + LoadGraph, + NiceBytes(MaxLoad * uint64_t(std::chrono::seconds(1).count()) / uint64_t(std::chrono::seconds(LoadGraphTime).count())), + DiskSpaceGCTriggered ? fmt::format("Disk use threshold triggered, trying to reclaim {}. ", NiceBytes(GcDiskSpaceGoal)) + : TimeBasedGCTriggered ? "GC schedule triggered." + : m_NextGcTime == GcClock::TimePoint::max() + ? "" + : fmt::format("{} until next scheduled GC.", NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTime).count())))); + + if (!DiskSpaceGCTriggered && !TimeBasedGCTriggered) + { + WaitTime = m_Config.MonitorInterval < RemaingTime ? m_Config.MonitorInterval : RemaingTime; + continue; + } + + WaitTime = m_Config.MonitorInterval; + uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); + if (!m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning))) + { + continue; + } + } + + CollectGarbage(ExpireTime, Delete, CollectSmallObjects); + + uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); + if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) + { + ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped)); + break; + } + + WaitTime = m_Config.MonitorInterval; + } +} + +GcClock::TimePoint +GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime) +{ + if (m_Config.Interval.count()) + { + return CurrentTime + m_Config.Interval; + } + else + { + return GcClock::TimePoint::max(); + } +} + +void +GcScheduler::CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects) +{ + GcContext GcCtx(ExpireTime); + GcCtx.SetDeletionMode(Delete); + GcCtx.CollectSmallObjects(CollectSmallObjects); + // GcCtx.MaxCacheDuration(MaxCacheDuration); + GcCtx.DiskReservePath(m_Config.RootDirectory / "reserve.gc"); + + ZEN_INFO("garbage collection STARTING, small objects gc {}, cutoff time {}", + GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv, + ExpireTime); + { + Stopwatch Timer; + const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + m_GcManager.CollectGarbage(GcCtx); + + if (Delete) + { + m_LastGcExpireTime = ExpireTime; + std::unique_lock Lock(m_GcMutex); + m_DiskUsageWindow.KeepRange(ExpireTime.time_since_epoch().count(), GcClock::Duration::max().count()); + } + + m_LastGcTime = GcClock::Now(); + m_NextGcTime = NextGcTime(m_LastGcTime); + + { + const fs::path Path = m_Config.RootDirectory / "gc_state"; + ZEN_DEBUG("saving scheduler state to '{}'", Path); + CbObjectWriter SchedulerState; + SchedulerState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count()); + SchedulerState << "LastGcExpireTime"sv << static_cast<int64_t>(m_LastGcExpireTime.time_since_epoch().count()); + SaveCompactBinaryObject(Path, SchedulerState.Save()); + } + + std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); + if (Ec) + { + ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason: '{}'", + m_Config.RootDirectory / "reserve.gc", + NiceBytes(m_Config.DiskReserveSize), + Ec.message()); + } + } +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +namespace gc::impl { + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } + + static CompressedBuffer Compress(IoBuffer Buffer) + { + return CompressedBuffer::Compress(SharedBuffer::MakeView(Buffer.GetData(), Buffer.GetSize())); + } +} // namespace gc::impl + +TEST_CASE("gc.basic") +{ + using namespace gc::impl; + + ScopedTemporaryDirectory TempDir; + + CidStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path() / "cas"; + + GcManager Gc; + CidStore CidStore(Gc); + + CidStore.Initialize(CasConfig); + + IoBuffer Chunk = CreateChunk(128); + auto CompressedChunk = Compress(Chunk); + + const auto InsertResult = CidStore.AddChunk(CompressedChunk.GetCompressed().Flatten().AsIoBuffer(), CompressedChunk.DecodeRawHash()); + CHECK(InsertResult.New); + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + CidStore.Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CidStore.ContainsChunk(CompressedChunk.DecodeRawHash())); +} + +TEST_CASE("gc.full") +{ + using namespace gc::impl; + + ScopedTemporaryDirectory TempDir; + + CidStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path() / "cas"; + + GcManager Gc; + std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc); + + CasStore->Initialize(CasConfig); + + uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; + IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]), + CreateChunk(ChunkSizes[1]), + CreateChunk(ChunkSizes[2]), + CreateChunk(ChunkSizes[3]), + CreateChunk(ChunkSizes[4]), + CreateChunk(ChunkSizes[5]), + CreateChunk(ChunkSizes[6]), + CreateChunk(ChunkSizes[7]), + CreateChunk(ChunkSizes[8])}; + IoHash ChunkHashes[9] = { + IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()), + IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()), + IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()), + IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()), + IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()), + IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()), + IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()), + IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()), + IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()), + }; + + CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); + CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); + CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); + CasStore->InsertChunk(Chunks[8], ChunkHashes[8]); + + CidStoreSize InitialSize = CasStore->TotalSize(); + + // Keep first and last + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + CasStore->Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(CasStore->ContainsChunk(ChunkHashes[0])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[7])); + CHECK(CasStore->ContainsChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); + } + + CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); + CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); + + // Keep last + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + CasStore->Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[7])); + CHECK(CasStore->ContainsChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); + + CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); + CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep mixed + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[1]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[7]); + GcCtx.AddRetainedCids(KeepChunks); + + CasStore->Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); + CHECK(CasStore->ContainsChunk(ChunkHashes[1])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); + CHECK(CasStore->ContainsChunk(ChunkHashes[4])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); + CHECK(CasStore->ContainsChunk(ChunkHashes[7])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); + + CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); + CasStore->InsertChunk(Chunks[8], ChunkHashes[8]); + } + + // Keep multiple at end + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[7]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + CasStore->Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); + CHECK(CasStore->ContainsChunk(ChunkHashes[6])); + CHECK(CasStore->ContainsChunk(ChunkHashes[7])); + CHECK(CasStore->ContainsChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); + + CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); + CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + } + + // Verify that we nicely appended blocks even after all GC operations + CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[3] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[3]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[5] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[5]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); + + auto FinalSize = CasStore->TotalSize(); + + CHECK_LE(InitialSize.TinySize, FinalSize.TinySize); + CHECK_GE(InitialSize.TinySize + (1u << 28), FinalSize.TinySize); +} + +TEST_CASE("gc.diskusagewindow") +{ + using namespace gc::impl; + + DiskUsageWindow Stats; + Stats.Append({.SampleTime = 0, .DiskUsage = 0}); // 0 0 + Stats.Append({.SampleTime = 10, .DiskUsage = 10}); // 1 10 + Stats.Append({.SampleTime = 20, .DiskUsage = 20}); // 2 10 + Stats.Append({.SampleTime = 30, .DiskUsage = 20}); // 3 0 + Stats.Append({.SampleTime = 40, .DiskUsage = 15}); // 4 0 + Stats.Append({.SampleTime = 50, .DiskUsage = 25}); // 5 10 + Stats.Append({.SampleTime = 60, .DiskUsage = 30}); // 6 5 + Stats.Append({.SampleTime = 70, .DiskUsage = 45}); // 7 15 + + SUBCASE("Truncate start") + { + Stats.KeepRange(-15, 31); + CHECK(Stats.m_LogWindow.size() == 4); + CHECK(Stats.m_LogWindow[0].SampleTime == 0); + CHECK(Stats.m_LogWindow[3].SampleTime == 30); + } + + SUBCASE("Truncate end") + { + Stats.KeepRange(70, 71); + CHECK(Stats.m_LogWindow.size() == 1); + CHECK(Stats.m_LogWindow[0].SampleTime == 70); + } + + SUBCASE("Truncate middle") + { + Stats.KeepRange(29, 69); + CHECK(Stats.m_LogWindow.size() == 4); + CHECK(Stats.m_LogWindow[0].SampleTime == 30); + CHECK(Stats.m_LogWindow[3].SampleTime == 60); + } + + SUBCASE("Full range") + { + uint64_t MaxDelta = 0; + // 0-10, 10-20, 20-30, 30-40, 40-50, 50-60, 60-70, 70-80 + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(0, 80, 10, MaxDelta); + CHECK(DiskDeltas.size() == 8); + CHECK(MaxDelta == 15); + CHECK(DiskDeltas[0] == 0); + CHECK(DiskDeltas[1] == 10); + CHECK(DiskDeltas[2] == 10); + CHECK(DiskDeltas[3] == 0); + CHECK(DiskDeltas[4] == 0); + CHECK(DiskDeltas[5] == 10); + CHECK(DiskDeltas[6] == 5); + CHECK(DiskDeltas[7] == 15); + } + + SUBCASE("Sub range") + { + uint64_t MaxDelta = 0; + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(20, 40, 10, MaxDelta); + CHECK(DiskDeltas.size() == 2); + CHECK(MaxDelta == 10); + CHECK(DiskDeltas[0] == 10); // [20:30] + CHECK(DiskDeltas[1] == 0); // [30:40] + } + SUBCASE("Unaligned sub range 1") + { + uint64_t MaxDelta = 0; + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(21, 51, 10, MaxDelta); + CHECK(DiskDeltas.size() == 3); + CHECK(MaxDelta == 10); + CHECK(DiskDeltas[0] == 0); // [21:31] + CHECK(DiskDeltas[1] == 0); // [31:41] + CHECK(DiskDeltas[2] == 10); // [41:51] + } + SUBCASE("Unaligned end range") + { + uint64_t MaxDelta = 0; + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(29, 79, 10, MaxDelta); + CHECK(DiskDeltas.size() == 5); + CHECK(MaxDelta == 15); + CHECK(DiskDeltas[0] == 0); // [29:39] + CHECK(DiskDeltas[1] == 0); // [39:49] + CHECK(DiskDeltas[2] == 10); // [49:59] + CHECK(DiskDeltas[3] == 5); // [59:69] + CHECK(DiskDeltas[4] == 15); // [69:79] + } + SUBCASE("Ahead of window") + { + uint64_t MaxDelta = 0; + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(-40, 0, 10, MaxDelta); + CHECK(DiskDeltas.size() == 4); + CHECK(MaxDelta == 0); + CHECK(DiskDeltas[0] == 0); // [-40:-30] + CHECK(DiskDeltas[1] == 0); // [-30:-20] + CHECK(DiskDeltas[2] == 0); // [-20:-10] + CHECK(DiskDeltas[3] == 0); // [-10:0] + } + SUBCASE("After of window") + { + uint64_t MaxDelta = 0; + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(90, 120, 10, MaxDelta); + CHECK(DiskDeltas.size() == 3); + CHECK(MaxDelta == 0); + CHECK(DiskDeltas[0] == 0); // [90:100] + CHECK(DiskDeltas[1] == 0); // [100:110] + CHECK(DiskDeltas[2] == 0); // [110:120] + } + SUBCASE("Encapsulating window") + { + uint64_t MaxDelta = 0; + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(-20, 100, 10, MaxDelta); + CHECK(DiskDeltas.size() == 12); + CHECK(MaxDelta == 15); + CHECK(DiskDeltas[0] == 0); // [-20:-10] + CHECK(DiskDeltas[1] == 0); // [ -10:0] + CHECK(DiskDeltas[2] == 0); // [0:10] + CHECK(DiskDeltas[3] == 10); // [10:20] + CHECK(DiskDeltas[4] == 10); // [20:30] + CHECK(DiskDeltas[5] == 0); // [30:40] + CHECK(DiskDeltas[6] == 0); // [40:50] + CHECK(DiskDeltas[7] == 10); // [50:60] + CHECK(DiskDeltas[8] == 5); // [60:70] + CHECK(DiskDeltas[9] == 15); // [70:80] + CHECK(DiskDeltas[10] == 0); // [80:90] + CHECK(DiskDeltas[11] == 0); // [90:100] + } + + SUBCASE("Full range half stride") + { + uint64_t MaxDelta = 0; + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(0, 80, 20, MaxDelta); + CHECK(DiskDeltas.size() == 4); + CHECK(MaxDelta == 20); + CHECK(DiskDeltas[0] == 10); // [0:20] + CHECK(DiskDeltas[1] == 10); // [20:40] + CHECK(DiskDeltas[2] == 10); // [40:60] + CHECK(DiskDeltas[3] == 20); // [60:80] + } + + SUBCASE("Partial odd stride") + { + uint64_t MaxDelta = 0; + std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(13, 67, 18, MaxDelta); + CHECK(DiskDeltas.size() == 3); + CHECK(MaxDelta == 15); + CHECK(DiskDeltas[0] == 10); // [13:31] + CHECK(DiskDeltas[1] == 0); // [31:49] + CHECK(DiskDeltas[2] == 15); // [49:67] + } + + SUBCASE("Find size window") + { + DiskUsageWindow Empty; + CHECK(Empty.FindTimepointThatRemoves(15u, 10000) == 10000); + + CHECK(Stats.FindTimepointThatRemoves(15u, 40) == 21); + CHECK(Stats.FindTimepointThatRemoves(15u, 20) == 20); + CHECK(Stats.FindTimepointThatRemoves(100000u, 50) == 50); + CHECK(Stats.FindTimepointThatRemoves(100000u, 1000)); + } +} +#endif + +void +gc_forcelink() +{ +} + +} // namespace zen diff --git a/src/zenstore/hashkeyset.cpp b/src/zenstore/hashkeyset.cpp new file mode 100644 index 000000000..a5436f5cb --- /dev/null +++ b/src/zenstore/hashkeyset.cpp @@ -0,0 +1,60 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/hashkeyset.h> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +void +HashKeySet::AddHashToSet(const IoHash& HashToAdd) +{ + m_HashSet.insert(HashToAdd); +} + +void +HashKeySet::AddHashesToSet(std::span<const IoHash> HashesToAdd) +{ + m_HashSet.insert(HashesToAdd.begin(), HashesToAdd.end()); +} + +void +HashKeySet::RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) +{ + for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd;) + { + if (Predicate(*It)) + { + It = m_HashSet.erase(It); + } + else + { + ++It; + } + } +} + +void +HashKeySet::IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const +{ + for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd; ++It) + { + Callback(*It); + } +} + +////////////////////////////////////////////////////////////////////////// +// +// Testing related code follows... +// + +#if ZEN_WITH_TESTS + +void +hashkeyset_forcelink() +{ +} + +#endif + +} // namespace zen diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h new file mode 100644 index 000000000..857ccae38 --- /dev/null +++ b/src/zenstore/include/zenstore/blockstore.h @@ -0,0 +1,175 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/filesystem.h> +#include <zencore/zencore.h> +#include <zenutil/basicfile.h> + +#include <unordered_map> +#include <unordered_set> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +struct BlockStoreLocation +{ + uint32_t BlockIndex; + uint64_t Offset; + uint64_t Size; + + inline auto operator<=>(const BlockStoreLocation& Rhs) const = default; +}; + +#pragma pack(push) +#pragma pack(1) + +struct BlockStoreDiskLocation +{ + constexpr static uint32_t MaxBlockIndexBits = 20; + constexpr static uint32_t MaxOffsetBits = 28; + constexpr static uint32_t MaxBlockIndex = (1ul << BlockStoreDiskLocation::MaxBlockIndexBits) - 1ul; + constexpr static uint32_t MaxOffset = (1ul << BlockStoreDiskLocation::MaxOffsetBits) - 1ul; + + BlockStoreDiskLocation(const BlockStoreLocation& Location, uint64_t OffsetAlignment) + { + Init(Location.BlockIndex, Location.Offset / OffsetAlignment, Location.Size); + } + + BlockStoreDiskLocation() = default; + + inline BlockStoreLocation Get(uint64_t OffsetAlignment) const + { + uint64_t PackedOffset = 0; + memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); + return {.BlockIndex = static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits), + .Offset = (PackedOffset & MaxOffset) * OffsetAlignment, + .Size = GetSize()}; + } + + inline uint32_t GetBlockIndex() const + { + uint64_t PackedOffset = 0; + memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); + return static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits); + } + + inline uint64_t GetOffset(uint64_t OffsetAlignment) const + { + uint64_t PackedOffset = 0; + memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); + return (PackedOffset & MaxOffset) * OffsetAlignment; + } + + inline uint64_t GetSize() const { return m_Size; } + + inline auto operator<=>(const BlockStoreDiskLocation& Rhs) const = default; + +private: + inline void Init(uint32_t BlockIndex, uint64_t Offset, uint64_t Size) + { + ZEN_ASSERT(BlockIndex <= MaxBlockIndex); + ZEN_ASSERT(Offset <= MaxOffset); + ZEN_ASSERT(Size <= std::numeric_limits<std::uint32_t>::max()); + + m_Size = static_cast<uint32_t>(Size); + uint64_t PackedOffset = (static_cast<uint64_t>(BlockIndex) << MaxOffsetBits) + Offset; + memcpy(&m_Offset[0], &PackedOffset, sizeof m_Offset); + } + + uint32_t m_Size; + uint8_t m_Offset[6]; +}; + +#pragma pack(pop) + +struct BlockStoreFile : public RefCounted +{ + explicit BlockStoreFile(const std::filesystem::path& BlockPath); + ~BlockStoreFile(); + const std::filesystem::path& GetPath() const; + void Open(); + void Create(uint64_t InitialSize); + void MarkAsDeleteOnClose(); + uint64_t FileSize(); + IoBuffer GetChunk(uint64_t Offset, uint64_t Size); + void Read(void* Data, uint64_t Size, uint64_t FileOffset); + void Write(const void* Data, uint64_t Size, uint64_t FileOffset); + void Flush(); + BasicFile& GetBasicFile(); + void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); + +private: + const std::filesystem::path m_Path; + IoBuffer m_IoBuffer; + BasicFile m_File; +}; + +class BlockStore +{ +public: + struct ReclaimSnapshotState + { + std::unordered_set<uint32_t> m_ActiveWriteBlocks; + size_t BlockCount; + }; + + typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray; + typedef std::vector<size_t> ChunkIndexArray; + + typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback; + typedef std::function<uint64_t()> ClaimDiskReserveCallback; + typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; + typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; + typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback; + + void Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + const std::vector<BlockStoreLocation>& KnownLocations); + void Close(); + + void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback); + + IoBuffer TryGetChunk(const BlockStoreLocation& Location) const; + void Flush(); + + ReclaimSnapshotState GetReclaimSnapshotState(); + void ReclaimSpace( + const ReclaimSnapshotState& Snapshot, + const std::vector<BlockStoreLocation>& ChunkLocations, + const ChunkIndexArray& KeepChunkIndexes, + uint64_t PayloadAlignment, + bool DryRun, + const ReclaimCallback& ChangeCallback = [](const MovedChunksArray&, const ChunkIndexArray&) {}, + const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; }); + + void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, + const IterateChunksSmallSizeCallback& SmallSizeCallback, + const IterateChunksLargeSizeCallback& LargeSizeCallback); + + static const char* GetBlockFileExtension(); + static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); + + inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); } + +private: + std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; + + mutable RwLock m_InsertLock; // used to serialize inserts + Ref<BlockStoreFile> m_WriteBlock; + std::uint64_t m_CurrentInsertOffset = 0; + std::atomic_uint32_t m_WriteBlockIndex{}; + std::vector<uint32_t> m_ActiveWriteBlocks; + + uint64_t m_MaxBlockSize = 1u << 28; + uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1; + std::filesystem::path m_BlocksBasePath; + + std::atomic_uint64_t m_TotalSize{}; +}; + +void blockstore_forcelink(); + +} // namespace zen diff --git a/src/zenstore/include/zenstore/caslog.h b/src/zenstore/include/zenstore/caslog.h new file mode 100644 index 000000000..d8c3f22f3 --- /dev/null +++ b/src/zenstore/include/zenstore/caslog.h @@ -0,0 +1,91 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/uid.h> +#include <zenutil/basicfile.h> + +namespace zen { + +class CasLogFile +{ +public: + CasLogFile(); + ~CasLogFile(); + + enum class Mode + { + kRead, + kWrite, + kTruncate + }; + + static bool IsValid(std::filesystem::path FileName, size_t RecordSize); + void Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode); + void Append(const void* DataPointer, uint64_t DataSize); + void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount); + void Flush(); + void Close(); + uint64_t GetLogSize(); + uint64_t GetLogCount(); + +private: + struct FileHeader + { + uint8_t Magic[16]; + uint32_t RecordSize = 0; + Oid LogId; + uint32_t ValidatedTail = 0; + uint32_t Pad[6]; + uint32_t Checksum = 0; + + static const inline uint8_t MagicSequence[16] = {'.', '-', '=', ' ', 'C', 'A', 'S', 'L', 'O', 'G', 'v', '1', ' ', '=', '-', '.'}; + + ZENCORE_API uint32_t ComputeChecksum(); + void Finalize() { Checksum = ComputeChecksum(); } + }; + + static_assert(sizeof(FileHeader) == 64); + +private: + void Open(std::filesystem::path FileName, size_t RecordSize, BasicFile::Mode Mode); + + BasicFile m_File; + FileHeader m_Header; + size_t m_RecordSize = 1; + std::atomic<uint64_t> m_AppendOffset = 0; +}; + +template<typename T> +class TCasLogFile : public CasLogFile +{ +public: + static bool IsValid(std::filesystem::path FileName) { return CasLogFile::IsValid(FileName, sizeof(T)); } + void Open(std::filesystem::path FileName, Mode Mode) { CasLogFile::Open(FileName, sizeof(T), Mode); } + + // This should be called before the Replay() is called to do some basic sanity checking + bool Initialize() { return true; } + + void Replay(Invocable<const T&> auto Handler, uint64_t SkipEntryCount) + { + CasLogFile::Replay( + [&](const void* VoidPtr) { + const T& Record = *reinterpret_cast<const T*>(VoidPtr); + + Handler(Record); + }, + SkipEntryCount); + } + + void Append(const T& Record) + { + // TODO: implement some more efficent path here so we don't end up with + // a syscall per append + + CasLogFile::Append(&Record, sizeof Record); + } + + void Append(const std::span<T>& Records) { CasLogFile::Append(Records.data(), sizeof(T) * Records.size()); } +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h new file mode 100644 index 000000000..16ca78225 --- /dev/null +++ b/src/zenstore/include/zenstore/cidstore.h @@ -0,0 +1,87 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenstore.h" + +#include <zencore/iohash.h> +#include <zenstore/hashkeyset.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <filesystem> + +namespace zen { + +class GcManager; +class CasStore; +class CompressedBuffer; +class IoBuffer; +class ScrubContext; + +/** Content Store + * + * Data in the content store is referenced by content identifiers (CIDs), it works + * with compressed buffers so the CID is expected to be the RAW hash. It stores the + * chunk directly under the RAW hash. + * This class maps uncompressed hashes (CIDs) to compressed hashes and may + * be used to deal with other kinds of indirections in the future. For example, if we want + * to support chunking then a CID may represent a list of chunks which could be concatenated + * to form the referenced chunk. + * + */ + +struct CidStoreSize +{ + uint64_t TinySize = 0; + uint64_t SmallSize = 0; + uint64_t LargeSize = 0; + uint64_t TotalSize = 0; +}; + +struct CidStoreConfiguration +{ + // Root directory for CAS store + std::filesystem::path RootDirectory; + + // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy + uint64_t TinyValueThreshold = 1024; + + // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy + uint64_t HugeValueThreshold = 1024 * 1024; +}; + +class CidStore +{ +public: + CidStore(GcManager& Gc); + ~CidStore(); + + struct InsertResult + { + bool New = false; + }; + enum class InsertMode + { + kCopyOnly, + kMayBeMovedInPlace + }; + + void Initialize(const CidStoreConfiguration& Config); + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace); + IoBuffer FindChunkByCid(const IoHash& DecompressedId); + bool ContainsChunk(const IoHash& DecompressedId); + void FilterChunks(HashKeySet& InOutChunks); + void Flush(); + void Scrub(ScrubContext& Ctx); + CidStoreSize TotalSize() const; + +private: + struct Impl; + std::unique_ptr<CasStore> m_CasStore; + std::unique_ptr<Impl> m_Impl; +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h new file mode 100644 index 000000000..e0354b331 --- /dev/null +++ b/src/zenstore/include/zenstore/gc.h @@ -0,0 +1,242 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/thread.h> +#include <zenstore/caslog.h> + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <filesystem> +#include <functional> +#include <optional> +#include <span> +#include <thread> + +#define ZEN_USE_REF_TRACKING 0 // This is not currently functional + +namespace spdlog { +class logger; +} + +namespace zen { + +class HashKeySet; +class GcManager; +class CidStore; +struct IoHash; + +/** GC clock + */ +class GcClock +{ +public: + using Clock = std::chrono::system_clock; + using TimePoint = Clock::time_point; + using Duration = Clock::duration; + using Tick = int64_t; + + static Tick TickCount() { return Now().time_since_epoch().count(); } + static TimePoint Now() { return Clock::now(); } + static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; } +}; + +/** Garbage Collection context object + */ +class GcContext +{ +public: + GcContext(const GcClock::TimePoint& ExpireTime); + ~GcContext(); + + void AddRetainedCids(std::span<const IoHash> Cid); + void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys); + + void IterateCids(std::function<void(const IoHash&)> Callback); + + void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc); + void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc); + + void AddDeletedCids(std::span<const IoHash> Cas); + const HashKeySet& DeletedCids(); + + std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const; + + bool IsDeletionMode() const; + void SetDeletionMode(bool NewState); + + bool CollectSmallObjects() const; + void CollectSmallObjects(bool NewState); + + GcClock::TimePoint ExpireTime() const; + + void DiskReservePath(const std::filesystem::path& Path); + uint64_t ClaimGCReserve(); + +private: + struct GcState; + + std::unique_ptr<GcState> m_State; +}; + +/** GC root contributor + + Higher level data structures provide roots for the garbage collector, + which ultimately determine what is garbage and what data we need to + retain. + + */ +class GcContributor +{ +public: + GcContributor(GcManager& Gc); + ~GcContributor(); + + virtual void GatherReferences(GcContext& GcCtx) = 0; + +protected: + GcManager& m_Gc; +}; + +struct GcStorageSize +{ + uint64_t DiskSize{}; + uint64_t MemorySize{}; +}; + +/** GC storage provider + */ +class GcStorage +{ +public: + GcStorage(GcManager& Gc); + ~GcStorage(); + + virtual void CollectGarbage(GcContext& GcCtx) = 0; + virtual GcStorageSize StorageSize() const = 0; + +private: + GcManager& m_Gc; +}; + +/** GC orchestrator + */ +class GcManager +{ +public: + GcManager(); + ~GcManager(); + + void AddGcContributor(GcContributor* Contributor); + void RemoveGcContributor(GcContributor* Contributor); + + void AddGcStorage(GcStorage* Contributor); + void RemoveGcStorage(GcStorage* Contributor); + + void CollectGarbage(GcContext& GcCtx); + + GcStorageSize TotalStorageSize() const; + +#if ZEN_USE_REF_TRACKING + void OnNewCidReferences(std::span<IoHash> Hashes); + void OnCommittedCidReferences(std::span<IoHash> Hashes); + void OnDroppedCidReferences(std::span<IoHash> Hashes); +#endif + +private: + spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + mutable RwLock m_Lock; + std::vector<GcContributor*> m_GcContribs; + std::vector<GcStorage*> m_GcStorage; + CidStore* m_CidStore = nullptr; +}; + +enum class GcSchedulerStatus : uint32_t +{ + kIdle, + kRunning, + kStopped +}; + +struct GcSchedulerConfig +{ + std::filesystem::path RootDirectory; + std::chrono::seconds MonitorInterval{30}; + std::chrono::seconds Interval{}; + std::chrono::seconds MaxCacheDuration{86400}; + bool CollectSmallObjects = true; + bool Enabled = true; + uint64_t DiskReserveSize = 1ul << 28; + uint64_t DiskSizeSoftLimit = 0; +}; + +class DiskUsageWindow +{ +public: + struct DiskUsageEntry + { + GcClock::Tick SampleTime; + uint64_t DiskUsage; + }; + + std::vector<DiskUsageEntry> m_LogWindow; + inline void Append(const DiskUsageEntry& Entry) { m_LogWindow.push_back(Entry); } + inline void Append(DiskUsageEntry&& Entry) { m_LogWindow.emplace_back(std::move(Entry)); } + void KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick); + std::vector<uint64_t> GetDiskDeltas(GcClock::Tick StartTick, + GcClock::Tick EndTick, + GcClock::Tick DeltaWidth, + uint64_t& OutMaxDelta) const; + GcClock::Tick FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const; +}; + +/** + * GC scheduler + */ +class GcScheduler +{ +public: + GcScheduler(GcManager& GcManager); + ~GcScheduler(); + + void Initialize(const GcSchedulerConfig& Config); + void Shutdown(); + GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } + + struct TriggerParams + { + bool CollectSmallObjects = false; + std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); + uint64_t DiskSizeSoftLimit = 0; + }; + + bool Trigger(const TriggerParams& Params); + +private: + void SchedulerThread(); + void CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects); + GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime); + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; + GcManager& m_GcManager; + GcSchedulerConfig m_Config; + GcClock::TimePoint m_LastGcTime{}; + GcClock::TimePoint m_LastGcExpireTime{}; + GcClock::TimePoint m_NextGcTime{}; + std::atomic_uint32_t m_Status{}; + std::thread m_GcThread; + std::mutex m_GcMutex; + std::condition_variable m_GcSignal; + std::optional<TriggerParams> m_TriggerParams; + + TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog; + DiskUsageWindow m_DiskUsageWindow; +}; + +void gc_forcelink(); + +} // namespace zen diff --git a/src/zenstore/include/zenstore/hashkeyset.h b/src/zenstore/include/zenstore/hashkeyset.h new file mode 100644 index 000000000..411a6256e --- /dev/null +++ b/src/zenstore/include/zenstore/hashkeyset.h @@ -0,0 +1,54 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenstore.h" + +#include <zencore/iohash.h> + +#include <functional> +#include <unordered_set> + +namespace zen { + +/** Manage a set of IoHash values + */ + +class HashKeySet +{ +public: + void AddHashToSet(const IoHash& HashToAdd); + void AddHashesToSet(std::span<const IoHash> HashesToAdd); + void RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); + void IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const; + [[nodiscard]] inline bool ContainsHash(const IoHash& Hash) const { return m_HashSet.find(Hash) != m_HashSet.end(); } + [[nodiscard]] inline bool IsEmpty() const { return m_HashSet.empty(); } + [[nodiscard]] inline size_t GetSize() const { return m_HashSet.size(); } + + inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) const + { + for (const IoHash& Candidate : Candidates) + { + if (ContainsHash(Candidate)) + { + MatchFunc(Candidate); + } + } + } + + inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) const + { + for (const IoHash& Candidate : Candidates) + { + MatchFunc(Candidate, ContainsHash(Candidate)); + } + } + +private: + // Q: should we protect this with a lock, or is that a higher level concern? + std::unordered_set<IoHash, IoHash::Hasher> m_HashSet; +}; + +void hashkeyset_forcelink(); + +} // namespace zen diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h new file mode 100644 index 000000000..0b884fcc6 --- /dev/null +++ b/src/zenstore/include/zenstore/scrubcontext.h @@ -0,0 +1,41 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/timer.h> +#include <zenstore/hashkeyset.h> + +namespace zen { + +/** Context object for data scrubbing + * + * Data scrubbing is when we traverse stored data to validate it and + * optionally correct/recover + */ + +class ScrubContext +{ +public: + virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); } + inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } + inline bool RunRecovery() const { return m_Recover; } + void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) + { + m_ChunkCount.fetch_add(ChunkCount); + m_ByteCount.fetch_add(ChunkBytes); + } + + inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } + inline uint64_t ScrubbedBytes() const { return m_ByteCount; } + + const HashKeySet BadCids() const { return m_BadCid; } + +private: + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; + std::atomic<uint64_t> m_ChunkCount{0}; + std::atomic<uint64_t> m_ByteCount{0}; + HashKeySet m_BadCid; +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h new file mode 100644 index 000000000..46d62029d --- /dev/null +++ b/src/zenstore/include/zenstore/zenstore.h @@ -0,0 +1,13 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#define ZENSTORE_API + +namespace zen { + +ZENSTORE_API void zenstore_forcelinktests(); + +} diff --git a/src/zenstore/xmake.lua b/src/zenstore/xmake.lua new file mode 100644 index 000000000..4469c5650 --- /dev/null +++ b/src/zenstore/xmake.lua @@ -0,0 +1,9 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target('zenstore') + set_kind("static") + add_headerfiles("**.h") + add_files("**.cpp") + add_includedirs("include", {public=true}) + add_deps("zencore", "zenutil") + add_packages("vcpkg::robin-map") diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp new file mode 100644 index 000000000..d87652fde --- /dev/null +++ b/src/zenstore/zenstore.cpp @@ -0,0 +1,32 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenstore/zenstore.h" + +#if ZEN_WITH_TESTS + +# include <zenstore/blockstore.h> +# include <zenstore/gc.h> +# include <zenstore/hashkeyset.h> +# include <zenutil/basicfile.h> + +# include "cas.h" +# include "compactcas.h" +# include "filecas.h" + +namespace zen { + +void +zenstore_forcelinktests() +{ + basicfile_forcelink(); + CAS_forcelink(); + filecas_forcelink(); + blockstore_forcelink(); + compactcas_forcelink(); + gc_forcelink(); + hashkeyset_forcelink(); +} + +} // namespace zen + +#endif |