diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /zenstore | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/blockstore.cpp | 1312 | ||||
| -rw-r--r-- | zenstore/cas.cpp | 355 | ||||
| -rw-r--r-- | zenstore/cas.h | 67 | ||||
| -rw-r--r-- | zenstore/caslog.cpp | 236 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 125 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 1511 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 95 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 1452 | ||||
| -rw-r--r-- | zenstore/filecas.h | 102 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 1312 | ||||
| -rw-r--r-- | zenstore/hashkeyset.cpp | 60 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 175 | ||||
| -rw-r--r-- | zenstore/include/zenstore/caslog.h | 91 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cidstore.h | 87 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 242 | ||||
| -rw-r--r-- | zenstore/include/zenstore/hashkeyset.h | 54 | ||||
| -rw-r--r-- | zenstore/include/zenstore/scrubcontext.h | 41 | ||||
| -rw-r--r-- | zenstore/include/zenstore/zenstore.h | 13 | ||||
| -rw-r--r-- | zenstore/xmake.lua | 9 | ||||
| -rw-r--r-- | zenstore/zenstore.cpp | 32 |
20 files changed, 0 insertions, 7371 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp deleted file mode 100644 index 5dfa10c91..000000000 --- a/zenstore/blockstore.cpp +++ /dev/null @@ -1,1312 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenstore/blockstore.h> - -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/timer.h> - -#include <algorithm> - -#if ZEN_WITH_TESTS -# include <zencore/compactbinarybuilder.h> -# include <zencore/testing.h> -# include <zencore/testutils.h> -# include <zencore/workthreadpool.h> -# include <random> -#endif - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -////////////////////////////////////////////////////////////////////////// - -BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath) -{ -} - -BlockStoreFile::~BlockStoreFile() -{ - m_IoBuffer = IoBuffer(); - m_File.Detach(); -} - -const std::filesystem::path& -BlockStoreFile::GetPath() const -{ - return m_Path; -} - -void -BlockStoreFile::Open() -{ - m_File.Open(m_Path, BasicFile::Mode::kDelete); - void* FileHandle = m_File.Handle(); - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); -} - -void -BlockStoreFile::Create(uint64_t InitialSize) -{ - auto ParentPath = m_Path.parent_path(); - if (!std::filesystem::is_directory(ParentPath)) - { - CreateDirectories(ParentPath); - } - - m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete); - void* FileHandle = m_File.Handle(); - - // We map our m_IoBuffer beyond the file size as we will grow it over time and want - // to be able to create sub-buffers of all the written range later - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); -} - -uint64_t -BlockStoreFile::FileSize() -{ - return m_File.FileSize(); -} - -void -BlockStoreFile::MarkAsDeleteOnClose() -{ - m_IoBuffer.MarkAsDeleteOnClose(); -} - -IoBuffer -BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size) -{ - return IoBuffer(m_IoBuffer, Offset, Size); -} - -void -BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset) -{ - m_File.Read(Data, Size, FileOffset); -} - -void -BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) -{ - m_File.Write(Data, Size, FileOffset); -} - -void -BlockStoreFile::Flush() -{ - m_File.Flush(); -} - -BasicFile& -BlockStoreFile::GetBasicFile() -{ - return m_File; -} - -void -BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) -{ - m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); -} - -constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; - -void -BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, - uint64_t MaxBlockSize, - uint64_t MaxBlockCount, - const std::vector<BlockStoreLocation>& KnownLocations) -{ - ZEN_ASSERT(MaxBlockSize > 0); - ZEN_ASSERT(MaxBlockCount > 0); - ZEN_ASSERT(IsPow2(MaxBlockCount)); - - m_TotalSize = 0; - m_BlocksBasePath = BlocksBasePath; - m_MaxBlockSize = MaxBlockSize; - - m_ChunkBlocks.clear(); - - std::unordered_set<uint32_t> KnownBlocks; - for (const auto& Entry : KnownLocations) - { - KnownBlocks.insert(Entry.BlockIndex); - } - - if (std::filesystem::is_directory(m_BlocksBasePath)) - { - std::vector<std::filesystem::path> FoldersToScan; - FoldersToScan.push_back(m_BlocksBasePath); - size_t FolderOffset = 0; - while (FolderOffset < FoldersToScan.size()) - { - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) - { - if (Entry.is_directory()) - { - FoldersToScan.push_back(Entry.path()); - continue; - } - if (Entry.is_regular_file()) - { - const std::filesystem::path Path = Entry.path(); - if (Path.extension() != GetBlockFileExtension()) - { - continue; - } - std::string FileName = PathToUtf8(Path.stem()); - uint32_t BlockIndex; - bool OK = ParseHexNumber(FileName, BlockIndex); - if (!OK) - { - continue; - } - if (!KnownBlocks.contains(BlockIndex)) - { - // Log removing unreferenced block - // Clear out unused blocks - ZEN_DEBUG("removing unused block at '{}'", Path); - std::error_code Ec; - std::filesystem::remove(Path, Ec); - if (Ec) - { - ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); - } - continue; - } - Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)}; - BlockFile->Open(); - m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed); - m_ChunkBlocks[BlockIndex] = BlockFile; - } - } - ++FolderOffset; - } - } - else - { - CreateDirectories(m_BlocksBasePath); - } -} - -void -BlockStore::Close() -{ - RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - m_WriteBlock = nullptr; - m_CurrentInsertOffset = 0; - m_WriteBlockIndex = 0; - - m_ChunkBlocks.clear(); - m_BlocksBasePath.clear(); -} - -void -BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback) -{ - ZEN_ASSERT(Data != nullptr); - ZEN_ASSERT(Size > 0u); - ZEN_ASSERT(Size <= m_MaxBlockSize); - ZEN_ASSERT(Alignment > 0u); - - RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - bool IsWriting = !!m_WriteBlock; - if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize) - { - if (m_WriteBlock) - { - m_WriteBlock = nullptr; - } - - if (m_ChunkBlocks.size() == m_MaxBlockCount) - { - throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); - } - - WriteBlockIndex += IsWriting ? 1 : 0; - while (m_ChunkBlocks.contains(WriteBlockIndex)) - { - WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); - } - - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - - Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); - NewBlockFile->Create(m_MaxBlockSize); - - m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; - m_WriteBlock = NewBlockFile; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - m_CurrentInsertOffset = 0; - } - uint64_t InsertOffset = m_CurrentInsertOffset; - m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); - uint64_t AlignedWriteSize = m_CurrentInsertOffset - InsertOffset; - Ref<BlockStoreFile> WriteBlock = m_WriteBlock; - m_ActiveWriteBlocks.push_back(WriteBlockIndex); - InsertLock.ReleaseNow(); - - WriteBlock->Write(Data, Size, InsertOffset); - m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed); - - Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}); - - { - RwLock::ExclusiveLockScope _(m_InsertLock); - m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); - } -} - -BlockStore::ReclaimSnapshotState -BlockStore::GetReclaimSnapshotState() -{ - ReclaimSnapshotState State; - RwLock::SharedLockScope _(m_InsertLock); - for (uint32_t BlockIndex : m_ActiveWriteBlocks) - { - State.m_ActiveWriteBlocks.insert(BlockIndex); - } - if (m_WriteBlock) - { - State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex); - } - State.BlockCount = m_ChunkBlocks.size(); - return State; -} - -IoBuffer -BlockStore::TryGetChunk(const BlockStoreLocation& Location) const -{ - RwLock::SharedLockScope InsertLock(m_InsertLock); - if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) - { - if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) - { - return Block->GetChunk(Location.Offset, Location.Size); - } - } - return IoBuffer(); -} - -void -BlockStore::Flush() -{ - RwLock::ExclusiveLockScope _(m_InsertLock); - if (m_CurrentInsertOffset > 0) - { - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); - m_WriteBlock = nullptr; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - m_CurrentInsertOffset = 0; - } -} - -void -BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, - const std::vector<BlockStoreLocation>& ChunkLocations, - const ChunkIndexArray& KeepChunkIndexes, - uint64_t PayloadAlignment, - bool DryRun, - const ReclaimCallback& ChangeCallback, - const ClaimDiskReserveCallback& DiskReserveCallback) -{ - if (ChunkLocations.empty()) - { - return; - } - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = ChunkLocations.size(); - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = 0; - uint64_t NewTotalSize = 0; - - uint64_t MovedCount = 0; - uint64_t DeletedCount = 0; - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_DEBUG( - "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " - "{} " - "of {} " - "chunks ({}).", - m_BlocksBasePath, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedCount, - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize)); - }); - - size_t BlockCount = Snapshot.BlockCount; - if (BlockCount == 0) - { - ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath); - return; - } - - std::unordered_set<size_t> KeepChunkMap; - KeepChunkMap.reserve(KeepChunkIndexes.size()); - for (size_t KeepChunkIndex : KeepChunkIndexes) - { - KeepChunkMap.insert(KeepChunkIndex); - } - - std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; - std::vector<ChunkIndexArray> BlockKeepChunks; - std::vector<ChunkIndexArray> BlockDeleteChunks; - - BlockIndexToChunkMapIndex.reserve(BlockCount); - BlockKeepChunks.reserve(BlockCount); - BlockDeleteChunks.reserve(BlockCount); - size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; - - size_t DeleteCount = 0; - for (size_t Index = 0; Index < TotalChunkCount; ++Index) - { - const BlockStoreLocation& Location = ChunkLocations[Index]; - OldTotalSize += Location.Size; - if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex)) - { - continue; - } - - auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); - size_t ChunkMapIndex = 0; - if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) - { - ChunkMapIndex = BlockKeepChunks.size(); - BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; - BlockKeepChunks.resize(ChunkMapIndex + 1); - BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); - BlockDeleteChunks.resize(ChunkMapIndex + 1); - BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); - } - else - { - ChunkMapIndex = BlockIndexPtr->second; - } - - if (KeepChunkMap.contains(Index)) - { - ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex]; - IndexMap.push_back(Index); - NewTotalSize += Location.Size; - continue; - } - ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex]; - IndexMap.push_back(Index); - DeleteCount++; - } - - std::unordered_set<uint32_t> BlocksToReWrite; - BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); - for (const auto& Entry : BlockIndexToChunkMapIndex) - { - uint32_t BlockIndex = Entry.first; - size_t ChunkMapIndex = Entry.second; - const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; - if (ChunkMap.empty()) - { - continue; - } - BlocksToReWrite.insert(BlockIndex); - } - - if (DryRun) - { - ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}", - m_BlocksBasePath, - DeleteCount, - NiceBytes(OldTotalSize - NewTotalSize), - TotalChunkCount, - OldTotalSize); - return; - } - - Ref<BlockStoreFile> NewBlockFile; - try - { - uint64_t WriteOffset = 0; - uint32_t NewBlockIndex = 0; - for (uint32_t BlockIndex : BlocksToReWrite) - { - const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; - - Ref<BlockStoreFile> OldBlockFile; - { - RwLock::SharedLockScope _i(m_InsertLock); - Stopwatch Timer; - const auto __ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - OldBlockFile = m_ChunkBlocks[BlockIndex]; - } - - if (!OldBlockFile) - { - // If the block file pointed to does not exist, move them all to deleted list - BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), - BlockKeepChunks[ChunkMapIndex].begin(), - BlockKeepChunks[ChunkMapIndex].end()); - BlockKeepChunks[ChunkMapIndex].clear(); - } - - const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex]; - if (KeepMap.empty()) - { - const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; - for (size_t DeleteIndex : DeleteMap) - { - DeletedSize += ChunkLocations[DeleteIndex].Size; - } - ChangeCallback({}, DeleteMap); - DeletedCount += DeleteMap.size(); - { - RwLock::ExclusiveLockScope _i(m_InsertLock); - Stopwatch Timer; - const auto __ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - if (OldBlockFile) - { - m_ChunkBlocks[BlockIndex] = nullptr; - ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); - m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); - OldBlockFile->MarkAsDeleteOnClose(); - } - } - continue; - } - - ZEN_ASSERT(OldBlockFile); - - MovedChunksArray MovedChunks; - std::vector<uint8_t> Chunk; - for (const size_t& ChunkIndex : KeepMap) - { - const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - - if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) - { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); - - if (NewBlockFile) - { - NewBlockFile->Flush(); - NewBlockFile = nullptr; - } - { - ChangeCallback(MovedChunks, {}); - MovedCount += KeepMap.size(); - MovedChunks.clear(); - RwLock::ExclusiveLockScope __(m_InsertLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - if (m_ChunkBlocks.size() == m_MaxBlockCount) - { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_BlocksBasePath, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return; - } - while (m_ChunkBlocks.contains(NextBlockIndex)) - { - NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1); - } - std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; - } - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); - if (Error) - { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); - return; - } - if (Space.Free < m_MaxBlockSize) - { - uint64_t ReclaimedSpace = DiskReserveCallback(); - if (Space.Free + ReclaimedSpace < m_MaxBlockSize) - { - ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", - m_BlocksBasePath, - m_MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); - RwLock::ExclusiveLockScope _l(m_InsertLock); - Stopwatch Timer; - const auto __ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - m_ChunkBlocks.erase(NextBlockIndex); - return; - } - - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", - m_BlocksBasePath, - ReclaimedSpace, - NiceBytes(Space.Free + ReclaimedSpace)); - } - NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; - } - - NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); - uint64_t OldOffset = WriteOffset; - WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); - m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed); - } - Chunk.clear(); - if (NewBlockFile) - { - NewBlockFile->Flush(); - NewBlockFile = nullptr; - } - - const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; - for (size_t DeleteIndex : DeleteMap) - { - DeletedSize += ChunkLocations[DeleteIndex].Size; - } - - ChangeCallback(MovedChunks, DeleteMap); - MovedCount += KeepMap.size(); - DeletedCount += DeleteMap.size(); - MovedChunks.clear(); - { - RwLock::ExclusiveLockScope __(m_InsertLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - m_ChunkBlocks[BlockIndex] = nullptr; - ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); - m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); - OldBlockFile->MarkAsDeleteOnClose(); - } - } - } - catch (std::exception& ex) - { - ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); - if (NewBlockFile) - { - ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); - m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed); - NewBlockFile->MarkAsDeleteOnClose(); - } - } -} - -void -BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - const IterateChunksSmallSizeCallback& SmallSizeCallback, - const IterateChunksLargeSizeCallback& LargeSizeCallback) -{ - std::vector<size_t> LocationIndexes; - LocationIndexes.reserve(ChunkLocations.size()); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) - { - LocationIndexes.push_back(ChunkIndex); - } - std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { - const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; - const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; - if (LocationA.BlockIndex < LocationB.BlockIndex) - { - return true; - } - else if (LocationA.BlockIndex > LocationB.BlockIndex) - { - return false; - } - return LocationA.Offset < LocationB.Offset; - }); - - IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; - void* BufferBase = ReadBuffer.MutableData(); - - RwLock::SharedLockScope _(m_InsertLock); - - auto GetNextRange = [&](size_t StartIndexOffset) { - size_t ChunkCount = 0; - size_t StartIndex = LocationIndexes[StartIndexOffset]; - const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; - uint64_t StartOffset = StartLocation.Offset; - while (StartIndexOffset + ChunkCount < LocationIndexes.size()) - { - size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount]; - const BlockStoreLocation& Location = ChunkLocations[NextIndex]; - if (Location.BlockIndex != StartLocation.BlockIndex) - { - break; - } - if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize) - { - break; - } - ++ChunkCount; - } - return ChunkCount; - }; - - size_t LocationIndexOffset = 0; - while (LocationIndexOffset < LocationIndexes.size()) - { - size_t ChunkIndex = LocationIndexes[LocationIndexOffset]; - const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; - - const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex]; - if (!BlockFile) - { - while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex) - { - SmallSizeCallback(ChunkIndex, nullptr, 0); - LocationIndexOffset++; - if (LocationIndexOffset == LocationIndexes.size()) - { - break; - } - ChunkIndex = LocationIndexes[LocationIndexOffset]; - } - continue; - } - size_t BlockSize = BlockFile->FileSize(); - size_t RangeCount = GetNextRange(LocationIndexOffset); - if (RangeCount > 0) - { - size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1]; - const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; - uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; - BlockFile->Read(BufferBase, Size, FirstLocation.Offset); - for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) - { - size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex]; - const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; - if (ChunkLocation.Size == 0 || (ChunkLocation.Offset + ChunkLocation.Size > BlockSize)) - { - SmallSizeCallback(NextChunkIndex, nullptr, 0); - continue; - } - void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; - SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); - } - LocationIndexOffset += RangeCount; - continue; - } - if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) - { - SmallSizeCallback(ChunkIndex, nullptr, 0); - LocationIndexOffset++; - continue; - } - LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); - LocationIndexOffset++; - } -} - -const char* -BlockStore::GetBlockFileExtension() -{ - return ".ucas"; -} - -std::filesystem::path -BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) -{ - ExtendablePathBuilder<256> Path; - - char BlockHexString[9]; - ToHexNumber(BlockIndex, BlockHexString); - - Path.Append(BlocksBasePath); - Path.AppendSeparator(); - Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); - Path.AppendSeparator(); - Path.Append(BlockHexString); - Path.Append(GetBlockFileExtension()); - return Path.ToPath(); -} - -#if ZEN_WITH_TESTS - -TEST_CASE("blockstore.blockstoredisklocation") -{ - BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; - CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4)); - - BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0}; - CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4)); - - BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; - CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4)); - - BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()}; - CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4)); - - BlockStoreLocation MaxBlockIndexAndOffset = - BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; - CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4)); - - BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, - .Offset = BlockStoreDiskLocation::MaxOffset * 4, - .Size = std::numeric_limits<uint32_t>::max()}; - CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4)); - - BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, - .Offset = BlockStoreDiskLocation::MaxOffset * 4096, - .Size = std::numeric_limits<uint32_t>::max()}; - CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096)); - - BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2, - .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4, - .Size = std::numeric_limits<uint32_t>::max() / 2}; - CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4)); -} - -TEST_CASE("blockstore.blockfile") -{ - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path() / "blocks"; - CreateDirectories(RootDirectory); - - { - BlockStoreFile File1(RootDirectory / "1"); - File1.Create(16384); - CHECK(File1.FileSize() == 0); - File1.Write("data", 5, 0); - IoBuffer DataChunk = File1.GetChunk(0, 5); - File1.Write("boop", 5, 5); - IoBuffer BoopChunk = File1.GetChunk(5, 5); - const char* Data = static_cast<const char*>(DataChunk.GetData()); - CHECK(std::string(Data) == "data"); - const char* Boop = static_cast<const char*>(BoopChunk.GetData()); - CHECK(std::string(Boop) == "boop"); - File1.Flush(); - CHECK(File1.FileSize() == 10); - } - { - BlockStoreFile File1(RootDirectory / "1"); - File1.Open(); - - char DataRaw[5]; - File1.Read(DataRaw, 5, 0); - CHECK(std::string(DataRaw) == "data"); - IoBuffer DataChunk = File1.GetChunk(0, 5); - - char BoopRaw[5]; - File1.Read(BoopRaw, 5, 5); - CHECK(std::string(BoopRaw) == "boop"); - - IoBuffer BoopChunk = File1.GetChunk(5, 5); - const char* Data = static_cast<const char*>(DataChunk.GetData()); - CHECK(std::string(Data) == "data"); - const char* Boop = static_cast<const char*>(BoopChunk.GetData()); - CHECK(std::string(Boop) == "boop"); - } - - { - IoBuffer DataChunk; - IoBuffer BoopChunk; - - { - BlockStoreFile File1(RootDirectory / "1"); - File1.Open(); - DataChunk = File1.GetChunk(0, 5); - BoopChunk = File1.GetChunk(5, 5); - } - - CHECK(std::filesystem::exists(RootDirectory / "1")); - - const char* Data = static_cast<const char*>(DataChunk.GetData()); - CHECK(std::string(Data) == "data"); - const char* Boop = static_cast<const char*>(BoopChunk.GetData()); - CHECK(std::string(Boop) == "boop"); - } - CHECK(std::filesystem::exists(RootDirectory / "1")); - - { - IoBuffer DataChunk; - IoBuffer BoopChunk; - - { - BlockStoreFile File1(RootDirectory / "1"); - File1.Open(); - File1.MarkAsDeleteOnClose(); - DataChunk = File1.GetChunk(0, 5); - BoopChunk = File1.GetChunk(5, 5); - } - - const char* Data = static_cast<const char*>(DataChunk.GetData()); - CHECK(std::string(Data) == "data"); - const char* Boop = static_cast<const char*>(BoopChunk.GetData()); - CHECK(std::string(Boop) == "boop"); - } - CHECK(!std::filesystem::exists(RootDirectory / "1")); -} - -namespace blockstore::impl { - BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment) - { - BlockStoreLocation Location; - Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; }); - CHECK(Location.Size == String.length()); - return Location; - }; - - std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location) - { - IoBuffer ChunkData = Store.TryGetChunk(Location); - if (!ChunkData) - { - return ""; - } - std::string AsString((const char*)ChunkData.Data(), ChunkData.Size()); - return AsString; - }; - - std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories) - { - DirectoryContent DirectoryContent; - GetDirectoryContent(RootDir, - DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) | - (Directories ? DirectoryContent::IncludeDirsFlag : 0), - DirectoryContent); - std::vector<std::filesystem::path> Result; - Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end()); - Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end()); - return Result; - }; - - static IoBuffer CreateChunk(uint64_t Size) - { - static std::random_device rd; - static std::mt19937 g(rd()); - - std::vector<uint8_t> Values; - Values.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) - { - Values[Idx] = static_cast<uint8_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); - } -} // namespace blockstore::impl - -TEST_CASE("blockstore.chunks") -{ - using namespace blockstore::impl; - - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path(); - - BlockStore Store; - Store.Initialize(RootDirectory, 128, 1024, {}); - IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); - CHECK(!BadChunk); - - std::string FirstChunkData = "This is the data of the first chunk that we will write"; - BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); - std::string SecondChunkData = "This is the data for the second chunk that we will write"; - BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); - - CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); - CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); - - std::string ThirdChunkData = - "This is a much longer string that will not fit in the first block so it should be placed in the second block"; - BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); - CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex); - - CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); - CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); - CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); -} - -TEST_CASE("blockstore.clean.stray.blocks") -{ - using namespace blockstore::impl; - - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path(); - - BlockStore Store; - Store.Initialize(RootDirectory / "store", 128, 1024, {}); - - std::string FirstChunkData = "This is the data of the first chunk that we will write"; - BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); - std::string SecondChunkData = "This is the data for the second chunk that we will write"; - BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); - std::string ThirdChunkData = - "This is a much longer string that will not fit in the first block so it should be placed in the second block"; - WriteStringAsChunk(Store, ThirdChunkData, 4); - - Store.Close(); - - // Not referencing the second block means that we should be deleted - Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation}); - - CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); -} - -TEST_CASE("blockstore.flush.forces.new.block") -{ - using namespace blockstore::impl; - - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path(); - - BlockStore Store; - Store.Initialize(RootDirectory / "store", 128, 1024, {}); - - std::string FirstChunkData = "This is the data of the first chunk that we will write"; - WriteStringAsChunk(Store, FirstChunkData, 4); - Store.Flush(); - std::string SecondChunkData = "This is the data for the second chunk that we will write"; - WriteStringAsChunk(Store, SecondChunkData, 4); - Store.Flush(); - std::string ThirdChunkData = - "This is a much longer string that will not fit in the first block so it should be placed in the second block"; - WriteStringAsChunk(Store, ThirdChunkData, 4); - - CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3); -} - -TEST_CASE("blockstore.iterate.chunks") -{ - using namespace blockstore::impl; - - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path(); - - BlockStore Store; - Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {}); - IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); - CHECK(!BadChunk); - - std::string FirstChunkData = "This is the data of the first chunk that we will write"; - BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); - - std::string SecondChunkData = "This is the data for the second chunk that we will write"; - BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); - Store.Flush(); - - std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); - BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); - - BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0}; - BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0, - .Offset = ScrubSmallChunkWindowSize, - .Size = ScrubSmallChunkWindowSize * 2}; - BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; - - Store.IterateChunks( - {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - switch (ChunkIndex) - { - case 0: - CHECK(Data); - CHECK(Size == FirstChunkData.size()); - CHECK(std::string((const char*)Data, Size) == FirstChunkData); - break; - case 1: - CHECK(Data); - CHECK(Size == SecondChunkData.size()); - CHECK(std::string((const char*)Data, Size) == SecondChunkData); - break; - case 2: - CHECK(false); - break; - case 3: - CHECK(!Data); - break; - case 4: - CHECK(!Data); - break; - case 5: - CHECK(!Data); - break; - default: - CHECK(false); - break; - } - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - switch (ChunkIndex) - { - case 0: - case 1: - CHECK(false); - break; - case 2: - { - CHECK(Size == VeryLargeChunk.size()); - char* Buffer = new char[Size]; - size_t HashOffset = 0; - File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { - memcpy(&Buffer[HashOffset], Data, Size); - HashOffset += Size; - }); - CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); - delete[] Buffer; - } - break; - case 3: - CHECK(false); - break; - case 4: - CHECK(false); - break; - case 5: - CHECK(false); - break; - default: - CHECK(false); - break; - } - }); -} - -TEST_CASE("blockstore.reclaim.space") -{ - using namespace blockstore::impl; - - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path(); - - BlockStore Store; - Store.Initialize(RootDirectory / "store", 512, 1024, {}); - - constexpr size_t ChunkCount = 200; - constexpr size_t Alignment = 8; - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkHashes; - ChunkLocations.reserve(ChunkCount); - ChunkHashes.reserve(ChunkCount); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - IoBuffer Chunk = CreateChunk(57 + ChunkIndex); - - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); }); - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - std::vector<size_t> ChunksToKeep; - ChunksToKeep.reserve(ChunkLocations.size()); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - ChunksToKeep.push_back(ChunkIndex); - } - - Store.Flush(); - BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); - Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); - - // If we keep all the chunks we should not get any callbacks on moved/deleted stuff - Store.ReclaimSpace( - State1, - ChunkLocations, - ChunksToKeep, - Alignment, - false, - [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); }, - []() { - CHECK(false); - return 0; - }); - - size_t DeleteChunkCount = 38; - ChunksToKeep.clear(); - for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex) - { - ChunksToKeep.push_back(ChunkIndex); - } - - std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations; - size_t MovedChunkCount = 0; - size_t DeletedChunkCount = 0; - Store.ReclaimSpace( - State1, - ChunkLocations, - ChunksToKeep, - Alignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { - for (const auto& MovedChunk : MovedChunks) - { - CHECK(MovedChunk.first >= DeleteChunkCount); - NewChunkLocations[MovedChunk.first] = MovedChunk.second; - } - MovedChunkCount += MovedChunks.size(); - for (size_t DeletedIndex : DeletedChunks) - { - CHECK(DeletedIndex < DeleteChunkCount); - } - DeletedChunkCount += DeletedChunks.size(); - }, - []() { - CHECK(false); - return 0; - }); - CHECK(MovedChunkCount <= DeleteChunkCount); - CHECK(DeletedChunkCount == DeleteChunkCount); - ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end()); - - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); - if (ChunkIndex >= DeleteChunkCount) - { - IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - } - } - - NewChunkLocations = ChunkLocations; - MovedChunkCount = 0; - DeletedChunkCount = 0; - Store.ReclaimSpace( - State1, - ChunkLocations, - {}, - Alignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { - CHECK(MovedChunks.empty()); - DeletedChunkCount += DeletedChunks.size(); - }, - []() { - CHECK(false); - return 0; - }); - CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); -} - -TEST_CASE("blockstore.thread.read.write") -{ - using namespace blockstore::impl; - - ScopedTemporaryDirectory TempDir; - auto RootDirectory = TempDir.Path(); - - BlockStore Store; - Store.Initialize(RootDirectory / "store", 1088, 1024, {}); - - constexpr size_t ChunkCount = 1000; - constexpr size_t Alignment = 8; - std::vector<IoBuffer> Chunks; - std::vector<IoHash> ChunkHashes; - Chunks.reserve(ChunkCount); - ChunkHashes.reserve(ChunkCount); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2); - Chunks.push_back(Chunk); - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - std::vector<BlockStoreLocation> ChunkLocations; - ChunkLocations.resize(ChunkCount); - - WorkerThreadPool WorkerPool(8); - std::atomic<size_t> WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - - WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - - std::vector<BlockStoreLocation> SecondChunkLocations; - SecondChunkLocations.resize(ChunkCount); - WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { - SecondChunkLocations[ChunkIndex] = L; - }); - WorkCompleted.fetch_add(1); - }); - WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size() * 2) - { - Sleep(1); - } -} - -#endif - -void -blockstore_forcelink() -{ -} - -} // namespace zen diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp deleted file mode 100644 index fdec78c60..000000000 --- a/zenstore/cas.cpp +++ /dev/null @@ -1,355 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "cas.h" - -#include "compactcas.h" -#include "filecas.h" - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/memory.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> -#include <zencore/thread.h> -#include <zencore/trace.h> -#include <zencore/uid.h> -#include <zenstore/cidstore.h> -#include <zenstore/gc.h> -#include <zenstore/scrubcontext.h> - -#include <gsl/gsl-lite.hpp> - -#include <filesystem> -#include <functional> -#include <unordered_map> - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -/** - * CAS store implementation - * - * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space - * quickly for unused large chunks and to maintain locality for small chunks which are - * frequently accessed together. - * - */ -class CasImpl : public CasStore -{ -public: - CasImpl(GcManager& Gc); - virtual ~CasImpl(); - - virtual void Initialize(const CidStoreConfiguration& InConfig) override; - virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override; - virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; - virtual bool ContainsChunk(const IoHash& ChunkHash) override; - virtual void FilterChunks(HashKeySet& InOutChunks) override; - virtual void Flush() override; - virtual void Scrub(ScrubContext& Ctx) override; - virtual void GarbageCollect(GcContext& GcCtx) override; - virtual CidStoreSize TotalSize() const override; - -private: - CasContainerStrategy m_TinyStrategy; - CasContainerStrategy m_SmallStrategy; - FileCasStrategy m_LargeStrategy; - CbObject m_ManifestObject; - - enum class StorageScheme - { - Legacy = 0, - WithCbManifest = 1 - }; - - StorageScheme m_StorageScheme = StorageScheme::Legacy; - - bool OpenOrCreateManifest(); - void UpdateManifest(); -}; - -CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc) -{ -} - -CasImpl::~CasImpl() -{ -} - -void -CasImpl::Initialize(const CidStoreConfiguration& InConfig) -{ - m_Config = InConfig; - - ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory); - - // Ensure root directory exists - create if it doesn't exist already - - std::filesystem::create_directories(m_Config.RootDirectory); - - // Open or create manifest - - const bool IsNewStore = OpenOrCreateManifest(); - - // Initialize payload storage - - m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); - m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block - m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block -} - -bool -CasImpl::OpenOrCreateManifest() -{ - bool IsNewStore = false; - - std::filesystem::path ManifestPath = m_Config.RootDirectory; - ManifestPath /= ".ucas_root"; - - std::error_code Ec; - BasicFile ManifestFile; - ManifestFile.Open(ManifestPath.c_str(), BasicFile::Mode::kRead, Ec); - - bool ManifestIsOk = false; - - if (Ec) - { - if (Ec == std::errc::no_such_file_or_directory) - { - IsNewStore = true; - } - } - else - { - IoBuffer ManifestBuffer = ManifestFile.ReadAll(); - ManifestFile.Close(); - - if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#') - { - // Old-style manifest, does not contain any useful information, so we may as well update it - } - else - { - CbObject Manifest{SharedBuffer(ManifestBuffer)}; - CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) - { - if (Manifest["id"]) - { - ManifestIsOk = true; - } - } - else - { - ZEN_WARN("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult)); - } - - if (ManifestIsOk) - { - m_ManifestObject = std::move(Manifest); - } - } - } - - if (!ManifestIsOk) - { - UpdateManifest(); - } - - return IsNewStore; -} - -void -CasImpl::UpdateManifest() -{ - if (!m_ManifestObject) - { - CbObjectWriter Cbo; - Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now(); - m_ManifestObject = Cbo.Save(); - } - - // Write manifest to file - - std::filesystem::path ManifestPath = m_Config.RootDirectory; - ManifestPath /= ".ucas_root"; - - // This will throw on failure - - ZEN_TRACE("Writing new manifest to '{}'", ManifestPath); - - BasicFile Marker; - Marker.Open(ManifestPath.c_str(), BasicFile::Mode::kTruncate); - Marker.Write(m_ManifestObject.GetBuffer(), 0); -} - -CasStore::InsertResult -CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) -{ - ZEN_TRACE_CPU("CAS::InsertChunk"); - - const uint64_t ChunkSize = Chunk.Size(); - - if (ChunkSize < m_Config.TinyValueThreshold) - { - ZEN_ASSERT(ChunkSize); - - return m_TinyStrategy.InsertChunk(Chunk, ChunkHash); - } - else if (ChunkSize < m_Config.HugeValueThreshold) - { - return m_SmallStrategy.InsertChunk(Chunk, ChunkHash); - } - - return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode); -} - -IoBuffer -CasImpl::FindChunk(const IoHash& ChunkHash) -{ - ZEN_TRACE_CPU("CAS::FindChunk"); - - if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash)) - { - return Found; - } - - if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash)) - { - return Found; - } - - if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash)) - { - return Found; - } - - // Not found - return IoBuffer{}; -} - -bool -CasImpl::ContainsChunk(const IoHash& ChunkHash) -{ - return m_SmallStrategy.HaveChunk(ChunkHash) || m_TinyStrategy.HaveChunk(ChunkHash) || m_LargeStrategy.HaveChunk(ChunkHash); -} - -void -CasImpl::FilterChunks(HashKeySet& InOutChunks) -{ - m_SmallStrategy.FilterChunks(InOutChunks); - m_TinyStrategy.FilterChunks(InOutChunks); - m_LargeStrategy.FilterChunks(InOutChunks); -} - -void -CasImpl::Flush() -{ - m_SmallStrategy.Flush(); - m_TinyStrategy.Flush(); - m_LargeStrategy.Flush(); -} - -void -CasImpl::Scrub(ScrubContext& Ctx) -{ - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_SmallStrategy.Scrub(Ctx); - m_TinyStrategy.Scrub(Ctx); - m_LargeStrategy.Scrub(Ctx); -} - -void -CasImpl::GarbageCollect(GcContext& GcCtx) -{ - m_SmallStrategy.CollectGarbage(GcCtx); - m_TinyStrategy.CollectGarbage(GcCtx); - m_LargeStrategy.CollectGarbage(GcCtx); -} - -CidStoreSize -CasImpl::TotalSize() const -{ - const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize; - const uint64_t Small = m_SmallStrategy.StorageSize().DiskSize; - const uint64_t Large = m_LargeStrategy.StorageSize().DiskSize; - - return {.TinySize = Tiny, .SmallSize = Small, .LargeSize = Large, .TotalSize = Tiny + Small + Large}; -} - -////////////////////////////////////////////////////////////////////////// - -std::unique_ptr<CasStore> -CreateCasStore(GcManager& Gc) -{ - return std::make_unique<CasImpl>(Gc); -} - -////////////////////////////////////////////////////////////////////////// -// -// Testing related code follows... -// - -#if ZEN_WITH_TESTS - -TEST_CASE("CasStore") -{ - ScopedTemporaryDirectory TempDir; - - CidStoreConfiguration config; - config.RootDirectory = TempDir.Path(); - - GcManager Gc; - - std::unique_ptr<CasStore> Store = CreateCasStore(Gc); - Store->Initialize(config); - - ScrubContext Ctx; - Store->Scrub(Ctx); - - IoBuffer Value1{16}; - memcpy(Value1.MutableData(), "1234567890123456", 16); - IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size()); - CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1); - CHECK(Result1.New); - - IoBuffer Value2{16}; - memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16); - IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size()); - CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2); - CHECK(Result2.New); - - HashKeySet ChunkSet; - ChunkSet.AddHashToSet(Hash1); - ChunkSet.AddHashToSet(Hash2); - - Store->FilterChunks(ChunkSet); - CHECK(ChunkSet.IsEmpty()); - - IoBuffer Lookup1 = Store->FindChunk(Hash1); - CHECK(Lookup1); - IoBuffer Lookup2 = Store->FindChunk(Hash2); - CHECK(Lookup2); -} - -void -CAS_forcelink() -{ -} - -#endif - -} // namespace zen diff --git a/zenstore/cas.h b/zenstore/cas.h deleted file mode 100644 index 9c48d4707..000000000 --- a/zenstore/cas.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/blake3.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/refcount.h> -#include <zencore/timer.h> -#include <zenstore/cidstore.h> -#include <zenstore/hashkeyset.h> - -#include <atomic> -#include <filesystem> -#include <functional> -#include <memory> -#include <string> -#include <unordered_set> - -namespace zen { - -class GcContext; -class GcManager; -class ScrubContext; - -/** Content Addressable Storage interface - - */ - -class CasStore -{ -public: - virtual ~CasStore() = default; - - const CidStoreConfiguration& Config() { return m_Config; } - - struct InsertResult - { - bool New = false; - }; - - enum class InsertMode - { - kCopyOnly, - kMayBeMovedInPlace - }; - - virtual void Initialize(const CidStoreConfiguration& Config) = 0; - virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0; - virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; - virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; - virtual void FilterChunks(HashKeySet& InOutChunks) = 0; - virtual void Flush() = 0; - virtual void Scrub(ScrubContext& Ctx) = 0; - virtual void GarbageCollect(GcContext& GcCtx) = 0; - virtual CidStoreSize TotalSize() const = 0; - -protected: - CidStoreConfiguration m_Config; - uint64_t m_LastScrubTime = 0; -}; - -ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); - -void CAS_forcelink(); - -} // namespace zen diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp deleted file mode 100644 index 2a978ae12..000000000 --- a/zenstore/caslog.cpp +++ /dev/null @@ -1,236 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenstore/caslog.h> - -#include "compactcas.h" - -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/memory.h> -#include <zencore/string.h> -#include <zencore/thread.h> -#include <zencore/uid.h> - -#include <xxhash.h> - -#include <gsl/gsl-lite.hpp> - -#include <filesystem> -#include <functional> - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -uint32_t -CasLogFile::FileHeader::ComputeChecksum() -{ - return XXH32(&this->Magic, sizeof(FileHeader) - 4, 0xC0C0'BABA); -} - -CasLogFile::CasLogFile() -{ -} - -CasLogFile::~CasLogFile() -{ -} - -bool -CasLogFile::IsValid(std::filesystem::path FileName, size_t RecordSize) -{ - if (!std::filesystem::is_regular_file(FileName)) - { - return false; - } - BasicFile File; - - std::error_code Ec; - File.Open(FileName, BasicFile::Mode::kRead, Ec); - if (Ec) - { - return false; - } - - FileHeader Header; - if (File.FileSize() < sizeof(Header)) - { - return false; - } - - // Validate header and log contents and prepare for appending/replay - File.Read(&Header, sizeof Header, 0); - - if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum())) - { - return false; - } - if (Header.RecordSize != RecordSize) - { - return false; - } - return true; -} - -void -CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode) -{ - m_RecordSize = RecordSize; - - std::error_code Ec; - BasicFile::Mode FileMode = BasicFile::Mode::kRead; - switch (Mode) - { - case Mode::kWrite: - FileMode = BasicFile::Mode::kWrite; - break; - case Mode::kTruncate: - FileMode = BasicFile::Mode::kTruncate; - break; - } - - m_File.Open(FileName, FileMode, Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to open log file '{}'", FileName)); - } - - uint64_t AppendOffset = 0; - - if ((Mode == Mode::kTruncate) || (m_File.FileSize() < sizeof(FileHeader))) - { - if (Mode == Mode::kRead) - { - throw std::runtime_error(fmt::format("Mangled log header (file to small) in '{}'", FileName)); - } - // Initialize log by writing header - FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0}; - memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic); - Header.Finalize(); - - m_File.Write(&Header, sizeof Header, 0); - - AppendOffset = sizeof(FileHeader); - - m_Header = Header; - } - else - { - FileHeader Header; - m_File.Read(&Header, sizeof Header, 0); - - if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum())) - { - throw std::runtime_error(fmt::format("Mangled log header (invalid header magic) in '{}'", FileName)); - } - if (Header.RecordSize != RecordSize) - { - throw std::runtime_error(fmt::format("Mangled log header (mismatch in record size, expected {}, found {}) in '{}'", - RecordSize, - Header.RecordSize, - FileName)); - } - - AppendOffset = m_File.FileSize(); - - // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended - - AppendOffset -= sizeof Header; - AppendOffset -= AppendOffset % RecordSize; - AppendOffset += sizeof Header; - - m_Header = Header; - } - - m_AppendOffset = AppendOffset; -} - -void -CasLogFile::Close() -{ - // TODO: update header and maybe add trailer - Flush(); - - m_File.Close(); -} - -uint64_t -CasLogFile::GetLogSize() -{ - return m_File.FileSize(); -} - -uint64_t -CasLogFile::GetLogCount() -{ - uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire); - if (LogFileSize < sizeof(FileHeader)) - { - return 0; - } - const uint64_t LogBaseOffset = sizeof(FileHeader); - const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; - return LogEntryCount; -} - -void -CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount) -{ - uint64_t LogFileSize = m_File.FileSize(); - - // Ensure we end up on a clean boundary - uint64_t LogBaseOffset = sizeof(FileHeader); - size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; - - if (LogEntryCount <= SkipEntryCount) - { - return; - } - - LogBaseOffset += SkipEntryCount * m_RecordSize; - LogEntryCount -= SkipEntryCount; - - // This should really be streaming the data rather than just - // reading it into memory, though we don't tend to get very - // large logs so it may not matter - - const uint64_t LogDataSize = LogEntryCount * m_RecordSize; - - std::vector<uint8_t> ReadBuffer; - ReadBuffer.resize(LogDataSize); - - m_File.Read(ReadBuffer.data(), LogDataSize, LogBaseOffset); - - for (int i = 0; i < int(LogEntryCount); ++i) - { - Handler(ReadBuffer.data() + (i * m_RecordSize)); - } - - m_AppendOffset = LogBaseOffset + (m_RecordSize * LogEntryCount); -} - -void -CasLogFile::Append(const void* DataPointer, uint64_t DataSize) -{ - ZEN_ASSERT((DataSize % m_RecordSize) == 0); - - uint64_t AppendOffset = m_AppendOffset.fetch_add(DataSize); - - std::error_code Ec; - m_File.Write(DataPointer, gsl::narrow<uint32_t>(DataSize), AppendOffset, Ec); - - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to write to log file '{}'", PathFromHandle(m_File.Handle()))); - } -} - -void -CasLogFile::Flush() -{ - m_File.Flush(); -} - -} // namespace zen diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp deleted file mode 100644 index 5a5116faf..000000000 --- a/zenstore/cidstore.cpp +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "zenstore/cidstore.h" - -#include <zencore/compress.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/string.h> -#include <zenstore/scrubcontext.h> - -#include "cas.h" - -#include <filesystem> - -namespace zen { - -struct CidStore::Impl -{ - Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} - - CasStore& m_CasStore; - - void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); } - - CidStore::InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, CidStore::InsertMode Mode) - { -#ifndef NDEBUG - IoHash VerifyRawHash; - uint64_t _; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHash == VerifyRawHash); -#endif // NDEBUG - IoBuffer Payload(ChunkData); - Payload.SetContentType(ZenContentType::kCompressedBinary); - - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, RawHash, static_cast<CasStore::InsertMode>(Mode)); - - return {.New = Result.New}; - } - - IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); } - - bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); } - - void FilterChunks(HashKeySet& InOutChunks) - { - InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); }); - } - - void Flush() { m_CasStore.Flush(); } - - void Scrub(ScrubContext& Ctx) - { - if (Ctx.ScrubTimestamp() == m_LastScrubTime) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CasStore.Scrub(Ctx); - } - - uint64_t m_LastScrubTime = 0; -}; - -////////////////////////////////////////////////////////////////////////// - -CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore)) -{ -} - -CidStore::~CidStore() -{ -} - -void -CidStore::Initialize(const CidStoreConfiguration& Config) -{ - m_Impl->Initialize(Config); -} - -CidStore::InsertResult -CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode) -{ - return m_Impl->AddChunk(ChunkData, RawHash, Mode); -} - -IoBuffer -CidStore::FindChunkByCid(const IoHash& DecompressedId) -{ - return m_Impl->FindChunkByCid(DecompressedId); -} - -bool -CidStore::ContainsChunk(const IoHash& DecompressedId) -{ - return m_Impl->ContainsChunk(DecompressedId); -} - -void -CidStore::FilterChunks(HashKeySet& InOutChunks) -{ - return m_Impl->FilterChunks(InOutChunks); -} - -void -CidStore::Flush() -{ - m_Impl->Flush(); -} - -void -CidStore::Scrub(ScrubContext& Ctx) -{ - m_Impl->Scrub(Ctx); -} - -CidStoreSize -CidStore::TotalSize() const -{ - return m_Impl->m_CasStore.TotalSize(); -} - -} // namespace zen diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp deleted file mode 100644 index 7b2c21b0f..000000000 --- a/zenstore/compactcas.cpp +++ /dev/null @@ -1,1511 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "compactcas.h" - -#include "cas.h" - -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zenstore/scrubcontext.h> - -#include <gsl/gsl-lite.hpp> - -#include <xxhash.h> - -#if ZEN_WITH_TESTS -# include <zencore/compactbinarybuilder.h> -# include <zencore/testing.h> -# include <zencore/testutils.h> -# include <zencore/workthreadpool.h> -# include <zenstore/cidstore.h> -# include <algorithm> -# include <random> -#endif - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -struct CasDiskIndexHeader -{ - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t CurrentVersion = 1; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t PayloadAlignment = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } -}; - -static_assert(sizeof(CasDiskIndexHeader) == 32); - -namespace { - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".ulog"; - - std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return RootPath / ContainerBaseName; - } - - std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); - } - - std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); - } - - std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); - } - - std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return GetBasePath(RootPath, ContainerBaseName) / "blocks"; - } - - bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); - return false; - } - if (Entry.Flags & CasDiskIndexEntry::kTombstone) - { - return true; - } - if (Entry.ContentType != ZenContentType::kUnknownContentType) - { - OutReason = - fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); - return false; - } - uint64_t Size = Entry.Location.GetSize(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - -} // namespace - -////////////////////////////////////////////////////////////////////////// - -CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas")) -{ -} - -CasContainerStrategy::~CasContainerStrategy() -{ -} - -void -CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, - const std::string_view ContainerBaseName, - uint32_t MaxBlockSize, - uint64_t Alignment, - bool IsNewStore) -{ - ZEN_ASSERT(IsPow2(Alignment)); - ZEN_ASSERT(!m_IsInitialized); - ZEN_ASSERT(MaxBlockSize > 0); - - m_RootDirectory = RootDirectory; - m_ContainerBaseName = ContainerBaseName; - m_PayloadAlignment = Alignment; - m_MaxBlockSize = MaxBlockSize; - m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); - - OpenContainer(IsNewStore); - - m_IsInitialized = true; -} - -CasStore::InsertResult -CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) -{ - { - RwLock::SharedLockScope _(m_LocationMapLock); - if (m_LocationMap.contains(ChunkHash)) - { - return CasStore::InsertResult{.New = false}; - } - } - - // We can end up in a situation that InsertChunk writes the same chunk data in - // different locations. - // We release the insert lock once we have the correct WriteBlock ready and we know - // where to write the data. If a new InsertChunk request for the same chunk hash/data - // comes in before we update m_LocationMap below we will have a race. - // The outcome of that is that we will write the chunk data in more than one location - // but the chunk hash will only point to one of the chunks. - // We will in that case waste space until the next GC operation. - // - // This should be a rare occasion and the current flow reduces the time we block for - // reads, insert and GC. - - m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { - BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); - const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); - { - RwLock::ExclusiveLockScope _(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, DiskLocation); - } - }); - - return CasStore::InsertResult{.New = true}; -} - -CasStore::InsertResult -CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) -{ -#if !ZEN_WITH_TESTS - ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); -#endif - return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); -} - -IoBuffer -CasContainerStrategy::FindChunk(const IoHash& ChunkHash) -{ - RwLock::SharedLockScope _(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt == m_LocationMap.end()) - { - return IoBuffer(); - } - const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment); - - IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); - return Chunk; -} - -bool -CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) -{ - RwLock::SharedLockScope _(m_LocationMapLock); - return m_LocationMap.contains(ChunkHash); -} - -void -CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) -{ - // This implementation is good enough for relatively small - // chunk sets (in terms of chunk identifiers), but would - // benefit from a better implementation which removes - // items incrementally for large sets, especially when - // we're likely to already have a large proportion of the - // chunks in the set - - InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); -} - -void -CasContainerStrategy::Flush() -{ - m_BlockStore.Flush(); - m_CasLog.Flush(); - MakeIndexSnapshot(); -} - -void -CasContainerStrategy::Scrub(ScrubContext& Ctx) -{ - std::vector<IoHash> BadKeys; - uint64_t ChunkCount{0}, ChunkBytes{0}; - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; - - RwLock::SharedLockScope _(m_LocationMapLock); - - uint64_t TotalChunkCount = m_LocationMap.size(); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - { - for (const auto& Entry : m_LocationMap) - { - const IoHash& ChunkHash = Entry.first; - const BlockStoreDiskLocation& DiskLocation = Entry.second; - BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); - - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash.push_back(ChunkHash); - } - } - - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); - return; - } - - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - if (RawHash != Hash) - { - // Hash mismatch - BadKeys.push_back(Hash); - return; - } - return; - } -#if ZEN_WITH_TESTS - IoHash ComputedHash = IoHash::HashBuffer(Data, Size); - if (ComputedHash == Hash) - { - return; - } -#endif - BadKeys.push_back(Hash); - }; - - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - - IoHash RawHash; - uint64_t RawSize; - // TODO: Add API to verify compressed buffer without having to memorymap the whole file - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - if (RawHash != Hash) - { - // Hash mismatch - BadKeys.push_back(Hash); - return; - } - return; - } -#if ZEN_WITH_TESTS - IoHashStream Hasher; - File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); - if (ComputedHash == Hash) - { - return; - } -#endif - BadKeys.push_back(Hash); - }; - - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); - - _.ReleaseNow(); - - Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - - if (!BadKeys.empty()) - { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); - - if (Ctx.RunRecovery()) - { - // Deal with bad chunks by removing them from our lookup map - - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - for (const IoHash& ChunkHash : BadKeys) - { - const auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt == m_LocationMap.end()) - { - // Might have been GC'd - continue; - } - LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); - m_LocationMap.erase(KeyIt); - } - } - m_CasLog.Append(LogEntries); - } - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadKeys); - - ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); -} - -void -CasContainerStrategy::CollectGarbage(GcContext& GcCtx) -{ - // It collects all the blocks that we want to delete chunks from. For each such - // block we keep a list of chunks to retain and a list of chunks to delete. - // - // If there is a block that we are currently writing to, that block is omitted - // from the garbage collection. - // - // Next it will iterate over all blocks that we want to remove chunks from. - // If the block is empty after removal of chunks we mark the block as pending - // delete - we want to delete it as soon as there are no IoBuffers using the - // block file. - // Once complete we update the m_LocationMap by removing the chunks. - // - // If the block is non-empty we write out the chunks we want to keep to a new - // block file (creating new block files as needed). - // - // We update the index as we complete each new block file. This makes it possible - // to break the GC if we want to limit time for execution. - // - // GC can very parallell to regular operation - it will block while taking - // a snapshot of the current m_LocationMap state and while moving blocks it will - // do a blocking operation and update the m_LocationMap after each new block is - // written and figuring out the path to the next new block. - - ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); - - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - - LocationMap_t LocationMap; - BlockStore::ReclaimSnapshotState BlockStoreState; - { - RwLock::SharedLockScope ___(m_LocationMapLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - LocationMap = m_LocationMap; - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - } - - uint64_t TotalChunkCount = LocationMap.size(); - - std::vector<IoHash> TotalChunkHashes; - TotalChunkHashes.reserve(TotalChunkCount); - for (const auto& Entry : LocationMap) - { - TotalChunkHashes.push_back(Entry.first); - } - - std::vector<BlockStoreLocation> ChunkLocations; - BlockStore::ChunkIndexArray KeepChunkIndexes; - std::vector<IoHash> ChunkIndexToChunkHash; - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - - GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = LocationMap.find(ChunkHash); - const BlockStoreDiskLocation& DiskLocation = KeyIt->second; - BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); - size_t ChunkIndex = ChunkLocations.size(); - - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; - if (Keep) - { - KeepChunkIndexes.push_back(ChunkIndex); - } - }); - - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - if (!PerformDelete) - { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); - return; - } - - std::vector<IoHash> DeletedChunks; - m_BlockStore.ReclaimSpace( - BlockStoreState, - ChunkLocations, - KeepChunkIndexes, - m_PayloadAlignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}}); - } - for (const size_t ChunkIndex : RemovedChunks) - { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash]; - LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone}); - DeletedChunks.push_back(ChunkHash); - } - - m_CasLog.Append(LogEntries); - m_CasLog.Flush(); - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - for (const CasDiskIndexEntry& Entry : LogEntries) - { - if (Entry.Flags & CasDiskIndexEntry::kTombstone) - { - m_LocationMap.erase(Entry.Key); - continue; - } - m_LocationMap[Entry.Key] = Entry.Location; - } - } - }, - [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); - - GcCtx.AddDeletedCids(DeletedChunks); -} - -void -CasContainerStrategy::MakeIndexSnapshot() -{ - uint64_t LogCount = m_CasLog.GetLogCount(); - if (m_LogFlushPosition == LogCount) - { - return; - } - - ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_RootDirectory / m_ContainerBaseName, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); - fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(TempIndexPath)) - { - fs::remove(TempIndexPath); - } - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, TempIndexPath); - } - - try - { - // Write the current state of the location map to a new index state - std::vector<CasDiskIndexEntry> Entries; - - { - RwLock::SharedLockScope ___(m_LocationMapLock); - Entries.resize(m_LocationMap.size()); - - uint64_t EntryIndex = 0; - for (auto& Entry : m_LocationMap) - { - CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = Entry.second; - } - } - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - CasDiskIndexHeader Header = {.EntryCount = Entries.size(), - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; - - Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); - - ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(TempIndexPath)) - { - fs::remove(IndexPath); - fs::rename(TempIndexPath, IndexPath); - } - } - if (fs::is_regular_file(TempIndexPath)) - { - fs::remove(TempIndexPath); - } -} - -uint64_t -CasContainerStrategy::ReadIndexFile() -{ - std::vector<CasDiskIndexEntry> Entries; - std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); - if (std::filesystem::is_regular_file(IndexPath)) - { - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - Entries.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CasDiskIndexHeader)) - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); - CasDiskIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && - (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && - (Header.EntryCount <= ExpectedEntryCount)) - { - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); - m_PayloadAlignment = Header.PayloadAlignment; - - std::string InvalidEntryReason; - for (const CasDiskIndexEntry& Entry : Entries) - { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - m_LocationMap[Entry.Key] = Entry.Location; - } - - return Header.LogPosition; - } - else - { - ZEN_WARN("skipping invalid index file '{}'", IndexPath); - } - } - } - return 0; -} - -uint64_t -CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) -{ - std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); - if (std::filesystem::is_regular_file(LogPath)) - { - size_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", - m_RootDirectory / m_ContainerBaseName, - LogEntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - TCasLogFile<CasDiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - CasLog.Replay( - [&](const CasDiskIndexEntry& Record) { - LogEntryCount++; - std::string InvalidEntryReason; - if (Record.Flags & CasDiskIndexEntry::kTombstone) - { - m_LocationMap.erase(Record.Key); - return; - } - if (!ValidateEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - return; - } - m_LocationMap[Record.Key] = Record.Location; - }, - SkipEntryCount); - return LogEntryCount; - } - } - return 0; -} - -void -CasContainerStrategy::OpenContainer(bool IsNewStore) -{ - // Add .running file and delete on clean on close to detect bad termination - - m_LocationMap.clear(); - - std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); - - if (IsNewStore) - { - std::filesystem::remove_all(BasePath); - } - - m_LogFlushPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); - - CreateDirectories(BasePath); - - std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); - m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_LocationMap.size()); - for (const auto& Entry : m_LocationMap) - { - const BlockStoreDiskLocation& Location = Entry.second; - KnownLocations.push_back(Location.Get(m_PayloadAlignment)); - } - - m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); - - if (IsNewStore || (LogEntryCount > 0)) - { - MakeIndexSnapshot(); - } - - // TODO: should validate integrity of container files here -} - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -namespace { - static IoBuffer CreateRandomChunk(uint64_t Size) - { - static std::random_device rd; - static std::mt19937 g(rd()); - - std::vector<uint8_t> Values; - Values.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) - { - Values[Idx] = static_cast<uint8_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); - } -} // namespace - -TEST_CASE("compactcas.hex") -{ - uint32_t Value; - std::string HexString; - CHECK(!ParseHexNumber("", Value)); - char Hex[9]; - - ToHexNumber(0u, Hex); - HexString = std::string(Hex); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == 0u); - - ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex); - HexString = std::string(Hex); - CHECK(HexString == "ffffffff"); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == std::numeric_limits<std::uint32_t>::max()); - - ToHexNumber(0xadf14711u, Hex); - HexString = std::string(Hex); - CHECK(HexString == "adf14711"); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == 0xadf14711u); - - ToHexNumber(0x80000000u, Hex); - HexString = std::string(Hex); - CHECK(HexString == "80000000"); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == 0x80000000u); - - ToHexNumber(0x718293a4u, Hex); - HexString = std::string(Hex); - CHECK(HexString == "718293a4"); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == 0x718293a4u); -} - -TEST_CASE("compactcas.compact.gc") -{ - ScopedTemporaryDirectory TempDir; - - const int kIterationCount = 1000; - - std::vector<IoHash> Keys(kIterationCount); - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); - - for (int i = 0; i < kIterationCount; ++i) - { - CbObjectWriter Cbo; - Cbo << "id" << i; - CbObject Obj = Cbo.Save(); - - IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); - const IoHash Hash = HashBuffer(ObjBuffer); - - Cas.InsertChunk(ObjBuffer, Hash); - - Keys[i] = Hash; - } - - for (int i = 0; i < kIterationCount; ++i) - { - IoBuffer Chunk = Cas.FindChunk(Keys[i]); - - CHECK(!!Chunk); - - CbObject Value = LoadCompactBinaryObject(Chunk); - - CHECK_EQ(Value["id"].AsInt32(), i); - } - } - - // Validate that we can still read the inserted data after closing - // the original cas store - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); - - for (int i = 0; i < kIterationCount; ++i) - { - IoBuffer Chunk = Cas.FindChunk(Keys[i]); - - CHECK(!!Chunk); - - CbObject Value = LoadCompactBinaryObject(Chunk); - - CHECK_EQ(Value["id"].AsInt32(), i); - } - } -} - -TEST_CASE("compactcas.compact.totalsize") -{ - std::random_device rd; - std::mt19937 g(rd()); - - // for (uint32_t i = 0; i < 100; ++i) - { - ScopedTemporaryDirectory TempDir; - - const uint64_t kChunkSize = 1024; - const int32_t kChunkCount = 16; - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - IoBuffer Chunk = CreateRandomChunk(kChunkSize); - const IoHash Hash = HashBuffer(Chunk); - CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); - } - - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - } - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); - - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - } - - // Re-open again, this time we should have a snapshot - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); - - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - } - } -} - -TEST_CASE("compactcas.gc.basic") -{ - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); - - IoBuffer Chunk = CreateRandomChunk(128); - IoHash ChunkHash = IoHash::HashBuffer(Chunk); - - const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); - CHECK(InsertResult.New); - Cas.Flush(); - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHash)); -} - -TEST_CASE("compactcas.gc.removefile") -{ - ScopedTemporaryDirectory TempDir; - - IoBuffer Chunk = CreateRandomChunk(128); - IoHash ChunkHash = IoHash::HashBuffer(Chunk); - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); - - const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); - CHECK(InsertResult.New); - const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); - CHECK(!InsertResultDup.New); - Cas.Flush(); - } - - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false); - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHash)); -} - -TEST_CASE("compactcas.gc.compact") -{ - // for (uint32_t i = 0; i < 100; ++i) - { - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true); - - uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(9); - for (uint64_t Size : ChunkSizes) - { - Chunks.push_back(CreateRandomChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(9); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); - CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); - CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); - CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); - CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); - CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); - CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); - CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); - CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); - - CHECK(Cas.HaveChunk(ChunkHashes[0])); - CHECK(Cas.HaveChunk(ChunkHashes[1])); - CHECK(Cas.HaveChunk(ChunkHashes[2])); - CHECK(Cas.HaveChunk(ChunkHashes[3])); - CHECK(Cas.HaveChunk(ChunkHashes[4])); - CHECK(Cas.HaveChunk(ChunkHashes[5])); - CHECK(Cas.HaveChunk(ChunkHashes[6])); - CHECK(Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - // Keep first and last - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[0]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(Cas.HaveChunk(ChunkHashes[0])); - CHECK(!Cas.HaveChunk(ChunkHashes[1])); - CHECK(!Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(!Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(!Cas.HaveChunk(ChunkHashes[6])); - CHECK(!Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[4], ChunkHashes[4]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[6], ChunkHashes[6]); - Cas.InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Keep last - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHashes[0])); - CHECK(!Cas.HaveChunk(ChunkHashes[1])); - CHECK(!Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(!Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(!Cas.HaveChunk(ChunkHashes[6])); - CHECK(!Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[4], ChunkHashes[4]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[6], ChunkHashes[6]); - Cas.InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Keep mixed - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[1]); - KeepChunks.push_back(ChunkHashes[4]); - KeepChunks.push_back(ChunkHashes[7]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHashes[0])); - CHECK(Cas.HaveChunk(ChunkHashes[1])); - CHECK(!Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(!Cas.HaveChunk(ChunkHashes[6])); - CHECK(Cas.HaveChunk(ChunkHashes[7])); - CHECK(!Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); - - Cas.InsertChunk(Chunks[0], ChunkHashes[0]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[6], ChunkHashes[6]); - Cas.InsertChunk(Chunks[8], ChunkHashes[8]); - } - - // Keep multiple at end - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[6]); - KeepChunks.push_back(ChunkHashes[7]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHashes[0])); - CHECK(!Cas.HaveChunk(ChunkHashes[1])); - CHECK(!Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(!Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(Cas.HaveChunk(ChunkHashes[6])); - CHECK(Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - Cas.InsertChunk(Chunks[0], ChunkHashes[0]); - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[4], ChunkHashes[4]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - } - - // Keep every other - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[0]); - KeepChunks.push_back(ChunkHashes[2]); - KeepChunks.push_back(ChunkHashes[4]); - KeepChunks.push_back(ChunkHashes[6]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(Cas.HaveChunk(ChunkHashes[0])); - CHECK(!Cas.HaveChunk(ChunkHashes[1])); - CHECK(Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(Cas.HaveChunk(ChunkHashes[6])); - CHECK(!Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Verify that we nicely appended blocks even after all GC operations - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - } -} - -TEST_CASE("compactcas.gc.deleteblockonopen") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(20); - for (uint64_t Size : ChunkSizes) - { - Chunks.push_back(CreateRandomChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(20); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); - - for (size_t i = 0; i < 20; i++) - { - CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); - } - - // GC every other block - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - for (size_t i = 0; i < 20; i += 2) - { - KeepChunks.push_back(ChunkHashes[i]); - } - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - for (size_t i = 0; i < 20; i += 2) - { - CHECK(Cas.HaveChunk(ChunkHashes[i])); - CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); - } - } - } - { - // Re-open - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 1024, 16, false); - - for (size_t i = 0; i < 20; i += 2) - { - CHECK(Cas.HaveChunk(ChunkHashes[i])); - CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); - } - } -} - -TEST_CASE("compactcas.gc.handleopeniobuffer") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(20); - for (const uint64_t& Size : ChunkSizes) - { - Chunks.push_back(CreateRandomChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(20); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); - - for (size_t i = 0; i < 20; i++) - { - CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); - } - - IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); - Cas.Flush(); - - // GC everything - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - Cas.CollectGarbage(GcCtx); - - for (size_t i = 0; i < 20; i++) - { - CHECK(!Cas.HaveChunk(ChunkHashes[i])); - } - - CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); -} - -TEST_CASE("compactcas.threadedinsert") -{ - // for (uint32_t i = 0; i < 100; ++i) - { - ScopedTemporaryDirectory TempDir; - - const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 4096; - uint64_t ExpectedSize = 0; - - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; - Chunks.reserve(kChunkCount); - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - while (true) - { - IoBuffer Chunk = CreateRandomChunk(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - if (Chunks.contains(Hash)) - { - continue; - } - Chunks[Hash] = Chunk; - ExpectedSize += Chunk.Size(); - break; - } - } - - std::atomic<size_t> WorkCompleted = 0; - WorkerThreadPool ThreadPool(4); - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); - { - for (const auto& Chunk : Chunks) - { - const IoHash& Hash = Chunk.first; - const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); - ZEN_ASSERT(InsertResult.New); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - } - - WorkCompleted = 0; - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_LE(ExpectedSize, TotalSize); - CHECK_GE(ExpectedSize + 32768, TotalSize); - - { - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - IoHash Hash = IoHash::HashBuffer(Buffer); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - } - - std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes; - GcChunkHashes.reserve(Chunks.size()); - for (const auto& Chunk : Chunks) - { - GcChunkHashes.insert(Chunk.first); - } - { - WorkCompleted = 0; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; - NewChunks.reserve(kChunkCount); - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - IoBuffer Chunk = CreateRandomChunk(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunks[Hash] = Chunk; - } - - std::atomic_uint32_t AddedChunkCount; - - for (const auto& Chunk : NewChunks) - { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { - Cas.InsertChunk(Chunk.second, Chunk.first); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); - } - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - if (Buffer) - { - CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); - } - WorkCompleted.fetch_add(1); - }); - } - - while (AddedChunkCount.load() < NewChunks.size()) - { - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - if (Cas.HaveChunk(Chunk.first)) - { - GcChunkHashes.emplace(Chunk.first); - } - } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Cas.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } - - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } - - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - if (Cas.HaveChunk(Chunk.first)) - { - GcChunkHashes.emplace(Chunk.first); - } - } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Cas.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } - { - WorkCompleted = 0; - for (const IoHash& ChunkHash : GcChunkHashes) - { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { - CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } - } - } -} - -#endif - -void -compactcas_forcelink() -{ -} - -} // namespace zen diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h deleted file mode 100644 index b0c6699eb..000000000 --- a/zenstore/compactcas.h +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/zencore.h> -#include <zenstore/blockstore.h> -#include <zenstore/caslog.h> -#include <zenstore/gc.h> - -#include "cas.h" - -#include <atomic> -#include <limits> -#include <unordered_map> - -namespace spdlog { -class logger; -} - -namespace zen { - -////////////////////////////////////////////////////////////////////////// - -#pragma pack(push) -#pragma pack(1) - -struct CasDiskIndexEntry -{ - static const uint8_t kTombstone = 0x01; - - IoHash Key; - BlockStoreDiskLocation Location; - ZenContentType ContentType = ZenContentType::kUnknownContentType; - uint8_t Flags = 0; -}; - -#pragma pack(pop) - -static_assert(sizeof(CasDiskIndexEntry) == 32); - -/** This implements a storage strategy for small CAS values - * - * New chunks are simply appended to a small object file, and an index is - * maintained to allow chunks to be looked up within the active small object - * files - * - */ - -struct CasContainerStrategy final : public GcStorage -{ - CasContainerStrategy(GcManager& Gc); - ~CasContainerStrategy(); - - CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); - IoBuffer FindChunk(const IoHash& ChunkHash); - bool HaveChunk(const IoHash& ChunkHash); - void FilterChunks(HashKeySet& InOutChunks); - void Initialize(const std::filesystem::path& RootDirectory, - const std::string_view ContainerBaseName, - uint32_t MaxBlockSize, - uint64_t Alignment, - bool IsNewStore); - void Flush(); - void Scrub(ScrubContext& Ctx); - virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_BlockStore.TotalSize()}; } - -private: - CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); - void MakeIndexSnapshot(); - uint64_t ReadIndexFile(); - uint64_t ReadLog(uint64_t SkipEntryCount); - void OpenContainer(bool IsNewStore); - - spdlog::logger& Log() { return m_Log; } - - std::filesystem::path m_RootDirectory; - spdlog::logger& m_Log; - uint64_t m_PayloadAlignment = 1u << 4; - uint64_t m_MaxBlockSize = 1u << 28; - bool m_IsInitialized = false; - TCasLogFile<CasDiskIndexEntry> m_CasLog; - uint64_t m_LogFlushPosition = 0; - std::string m_ContainerBaseName; - std::filesystem::path m_BlocksBasePath; - BlockStore m_BlockStore; - - RwLock m_LocationMapLock; - typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t; - LocationMap_t m_LocationMap; -}; - -void compactcas_forcelink(); - -} // namespace zen diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp deleted file mode 100644 index 1d25920c4..000000000 --- a/zenstore/filecas.cpp +++ /dev/null @@ -1,1452 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "filecas.h" - -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/memory.h> -#include <zencore/scopeguard.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> -#include <zencore/thread.h> -#include <zencore/timer.h> -#include <zencore/uid.h> -#include <zenstore/gc.h> -#include <zenstore/scrubcontext.h> -#include <zenutil/basicfile.h> - -#if ZEN_WITH_TESTS -# include <zencore/compactbinarybuilder.h> -#endif - -#include <gsl/gsl-lite.hpp> - -#include <barrier> -#include <filesystem> -#include <functional> -#include <unordered_map> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <xxhash.h> -#if ZEN_PLATFORM_WINDOWS -# include <atlfile.h> -#endif -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -namespace filecas::impl { - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".ulog"; - - std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); } - - std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir) - { - return RootDir / fmt::format("cas.tmp{}", IndexExtension); - } - - std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); } - -#pragma pack(push) -#pragma pack(1) - - struct FileCasIndexHeader - { - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t CurrentVersion = 1; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t Reserved = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const FileCasIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } - }; - - static_assert(sizeof(FileCasIndexHeader) == 32); - -#pragma pack(pop) - -} // namespace filecas::impl - -FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) -{ - ShardedPath.Append(RootPath.c_str()); - - ExtendableStringBuilder<64> HashString; - ChunkHash.ToHexString(HashString); - - const char* str = HashString.c_str(); - - // Shard into a path with two directory levels containing 12 bits and 8 bits - // respectively. - // - // This results in a maximum of 4096 * 256 directories - // - // The numbers have been chosen somewhat arbitrarily but are large to scale - // to very large chunk repositories without creating too many directories - // on a single level since NTFS does not deal very well with this. - // - // It may or may not make sense to make this a configurable policy, and it - // would probably be a good idea to measure performance for different - // policies and chunk counts - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str, str + 3); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 3, str + 5); - Shard2len = ShardedPath.Size(); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 5, str + 40); -} - -////////////////////////////////////////////////////////////////////////// - -FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas")) -{ -} - -FileCasStrategy::~FileCasStrategy() -{ -} - -void -FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) -{ - using namespace filecas::impl; - - m_IsInitialized = true; - - m_RootDirectory = RootDirectory; - - m_Index.clear(); - - std::filesystem::path LogPath = GetLogPath(m_RootDirectory); - std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); - - if (IsNewStore) - { - std::filesystem::remove(LogPath); - std::filesystem::remove(IndexPath); - - if (std::filesystem::is_directory(m_RootDirectory)) - { - // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside - // in this folder as well - struct Visitor : public FileSystemTraversal::TreeVisitor - { - virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override - { - // We don't care about files - } - static bool IsHexChar(std::filesystem::path::value_type C) - { - return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16]; - } - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& DirectoryName) override - { - if (DirectoryName.length() == 3) - { - if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2])) - { - ShardedRoots.push_back(Parent / DirectoryName); - } - } - return false; - } - std::vector<std::filesystem::path> ShardedRoots; - } CasVisitor; - - FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); - for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots) - { - std::filesystem::remove_all(SharededRoot); - } - } - } - - m_LogFlushPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); - for (const auto& Entry : m_Index) - { - m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed); - } - - CreateDirectories(m_RootDirectory); - m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - - if (IsNewStore || LogEntryCount > 0) - { - MakeIndexSnapshot(); - } -} - -#if ZEN_PLATFORM_WINDOWS -static void -DeletePayloadFileOnClose(const void* FileHandle) -{ - const HANDLE WinFileHandle = (const HANDLE)FileHandle; - // This will cause the file to be deleted when the last handle to it is closed - FILE_DISPOSITION_INFO Fdi{}; - Fdi.DeleteFile = TRUE; - BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); - - if (!Success) - { - // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed - // to delete it. - ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'", - PathFromHandle(WinFileHandle), - GetLastErrorAsString()); - } -} -#endif - -CasStore::InsertResult -FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) -{ - ZEN_ASSERT(m_IsInitialized); - -#if !ZEN_WITH_TESTS - ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); -#endif - - if (Mode == CasStore::InsertMode::kCopyOnly) - { - { - RwLock::SharedLockScope _(m_Lock); - if (m_Index.contains(ChunkHash)) - { - return CasStore::InsertResult{.New = false}; - } - } - return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); - } - - // File-based chunks have special case handling whereby we move the file into - // place in the file store directory, thus avoiding unnecessary copying - - IoBufferFileReference FileRef; - if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) - { - { - bool Exists = true; - { - RwLock::SharedLockScope _(m_Lock); - Exists = m_Index.contains(ChunkHash); - } - if (Exists) - { -#if ZEN_PLATFORM_WINDOWS - DeletePayloadFileOnClose(FileRef.FileHandle); -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle); - if (unlink(FilePath.c_str()) < 0) - { - int UnlinkError = zen::GetLastError(); - if (UnlinkError != ENOENT) - { - ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", - FilePath.string(), - GetSystemErrorAsString(UnlinkError)); - } - } -#endif - return CasStore::InsertResult{.New = false}; - } - } - - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); - -#if ZEN_PLATFORM_WINDOWS - const HANDLE ChunkFileHandle = FileRef.FileHandle; - // See if file already exists - { - CAtlFile PayloadFile; - - if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) - { - // If we succeeded in opening the target file then we don't need to do anything else because it already exists - // and should contain the content we were about to insert - - // We do need to ensure the source file goes away on close, however - size_t ChunkSize = Chunk.GetSize(); - uint64_t FileSize = 0; - if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) - { - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - - DeletePayloadFileOnClose(ChunkFileHandle); - - return CasStore::InsertResult{.New = IsNew}; - } - else - { - ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", - Name.ShardedPath.ToUtf8(), - ChunkSize, - FileSize); - } - } - else - { - if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) - { - // Shard directory does not exist - } - else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) - { - // Shard directory exists, but not the file - } - else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION)) - { - // Sharing violation, likely because we are trying to open a file - // which has been renamed on another thread, and the file handle - // used to rename it is still open. We handle this case below - // instead of here - } - else - { - ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes); - } - } - } - - std::filesystem::path FullPath(Name.ShardedPath.c_str()); - - std::filesystem::path FilePath = FullPath.parent_path(); - std::wstring FileName = FullPath.native(); - - const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR)); - FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize)); - memset(RenameInfo, 0, BufferSize); - - RenameInfo->ReplaceIfExists = FALSE; - RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size()); - memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR)); - RenameInfo->FileName[FileName.size()] = 0; - - auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); - - // Try to move file into place - BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); - - if (!Success) - { - // The rename/move could fail because the target directory does not yet exist. This code attempts - // to create it - - CAtlFile DirHandle; - - auto InternalCreateDirectoryHandle = [&] { - return DirHandle.Create(FilePath.c_str(), - GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - OPEN_EXISTING, - FILE_FLAG_BACKUP_SEMANTICS); - }; - - // It's possible for several threads to enter this logic trying to create the same - // directory. Only one will create the directory of course, but all threads will - // make it through okay - - HRESULT hRes = InternalCreateDirectoryHandle(); - - if (FAILED(hRes)) - { - // TODO: we can handle directory creation more intelligently and efficiently than - // this currently does - - CreateDirectories(FilePath.c_str()); - - hRes = InternalCreateDirectoryHandle(); - } - - if (FAILED(hRes)) - { - ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath)); - } - - // Retry rename/move - - Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); - } - - if (Success) - { - m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); - - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - - return CasStore::InsertResult{.New = IsNew}; - } - - const DWORD LastError = GetLastError(); - - if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) - { - HashLock.ReleaseNow(); - DeletePayloadFileOnClose(ChunkFileHandle); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - - return CasStore::InsertResult{.New = IsNew}; - } - - ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", - GetSystemErrorAsString(LastError), - ChunkHash); - - DeletePayloadFileOnClose(ChunkFileHandle); - -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); - std::filesystem::path DestPath = Name.ShardedPath.c_str(); - int Ret = link(SourcePath.c_str(), DestPath.c_str()); - if (Ret < 0 && zen::GetLastError() == ENOENT) - { - // Destination directory doesn't exist. Create it any try again. - CreateDirectories(DestPath.parent_path().c_str()); - Ret = link(SourcePath.c_str(), DestPath.c_str()); - } - int LinkError = zen::GetLastError(); - - if (unlink(SourcePath.c_str()) < 0) - { - int UnlinkError = zen::GetLastError(); - if (UnlinkError != ENOENT) - { - ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", - SourcePath.string(), - GetSystemErrorAsString(UnlinkError)); - } - } - - // It is possible that someone beat us to it in linking the file. In that - // case a "file exists" error is okay. All others are not. - if (Ret < 0) - { - if (LinkError == EEXIST) - { - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - - ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}", - GetSystemErrorAsString(LinkError), - ChunkHash); - } - else - { - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } -#endif // ZEN_PLATFORM_* - } - - return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); -} - -CasStore::InsertResult -FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) -{ - ZEN_ASSERT(m_IsInitialized); - - { - RwLock::SharedLockScope _(m_Lock); - if (m_Index.contains(ChunkHash)) - { - return {.New = false}; - } - } - - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - // See if file already exists - -#if ZEN_PLATFORM_WINDOWS - CAtlFile PayloadFile; - - HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); - - if (SUCCEEDED(hRes)) - { - // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the - // content we were about to insert - - bool IsNew = false; - { - RwLock::ExclusiveLockScope _(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - - PayloadFile.Close(); -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - if (access(Name.ShardedPath.c_str(), F_OK) == 0) - { - return CasStore::InsertResult{.New = false}; - } -#endif - - RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); - -#if ZEN_PLATFORM_WINDOWS - // For now, use double-checked locking to see if someone else was first - - hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); - - if (SUCCEEDED(hRes)) - { - uint64_t FileSize = 0; - if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) - { - // If we succeeded in opening the file then and the size is correct we don't need to do anything - // else because someone else managed to create the file before we did. Just return. - - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - else - { - ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", - Name.ShardedPath.ToUtf8(), - ChunkSize, - FileSize); - } - } - - if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))) - { - ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); - } - - auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; - - hRes = InternalCreateFile(); - - if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) - { - // Ensure parent directories exist and retry file creation - CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); - hRes = InternalCreateFile(); - } - - if (FAILED(hRes)) - { - ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8())); - } -#else - // Attempt to exclusively create the file. - auto InternalCreateFile = [&] { - int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); - if (Fd >= 0) - { - fchmod(Fd, 0666); - } - return Fd; - }; - int Fd = InternalCreateFile(); - if (Fd < 0) - { - switch (zen::GetLastError()) - { - case EEXIST: - // Another thread has beat us to it so we're golden. - { - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - return {.New = IsNew}; - } - break; - - case ENOENT: - if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len))) - { - Fd = InternalCreateFile(); - if (Fd >= 0) - { - break; - } - } - ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath)); - - default: - ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8())); - } - } - - struct FdWrapper - { - ~FdWrapper() { Close(); } - void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); } - void Close() - { - if (Fd >= 0) - { - close(Fd); - Fd = -1; - } - } - int Fd; - } PayloadFile = {Fd}; -#endif // ZEN_PLATFORM_WINDOWS - - size_t ChunkRemain = ChunkSize; - auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData); - - while (ChunkRemain != 0) - { - uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain)); - - PayloadFile.Write(ChunkCursor, ByteCount); - - ChunkCursor += ByteCount; - ChunkRemain -= ByteCount; - } - - // We cannot rely on RAII to close the file handle since it would be closed - // *after* the lock is released due to the initialization order - PayloadFile.Close(); - - m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); - - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - - return {.New = IsNew}; -} - -IoBuffer -FileCasStrategy::FindChunk(const IoHash& ChunkHash) -{ - ZEN_ASSERT(m_IsInitialized); - - { - RwLock::SharedLockScope _(m_Lock); - if (!m_Index.contains(ChunkHash)) - { - return {}; - } - } - - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - RwLock::SharedLockScope _(LockForHash(ChunkHash)); - - return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); -} - -bool -FileCasStrategy::HaveChunk(const IoHash& ChunkHash) -{ - ZEN_ASSERT(m_IsInitialized); - - RwLock::SharedLockScope _(m_Lock); - return m_Index.contains(ChunkHash); -} - -void -FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) -{ - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); - if (Ec) - { - ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8()); - FileSize = 0; - } - - ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize)); - std::filesystem::remove(Name.ShardedPath.c_str(), Ec); - - if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str())) - { - { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) - { - m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); - m_Index.erase(It); - } - } - m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); - } -} - -void -FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) -{ - ZEN_ASSERT(m_IsInitialized); - - // NOTE: it's not a problem now, but in the future if a GC should happen while this - // is in flight, the result could be wrong since chunks could go away in the meantime. - // - // It would be good to have a pinning mechanism to make this less likely but - // given that chunks could go away at any point after the results are returned to - // a caller, this is something which needs to be taken into account by anyone consuming - // this functionality in any case - - InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); -} - -void -FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback) -{ - ZEN_ASSERT(m_IsInitialized); - - RwLock::SharedLockScope _(m_Lock); - for (const auto& It : m_Index) - { - const IoHash& NameHash = It.first; - ShardingHelper Name(m_RootDirectory.c_str(), NameHash); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); - Callback(NameHash, std::move(Payload)); - } -} - -void -FileCasStrategy::Flush() -{ - // Since we don't keep files open after writing there's nothing specific - // to flush here. - // - // Depending on what semantics we want Flush() to provide, it could be - // argued that this should just flush the volume which we are using to - // store the CAS files on here, to ensure metadata is flushed along - // with file data - // - // Related: to facilitate more targeted validation during recovery we could - // maintain a log of when chunks were created -} - -void -FileCasStrategy::Scrub(ScrubContext& Ctx) -{ - ZEN_ASSERT(m_IsInitialized); - - std::vector<IoHash> BadHashes; - uint64_t ChunkCount{0}, ChunkBytes{0}; - - { - std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); - RwLock::ExclusiveLockScope _(m_Lock); - for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) - { - if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed); - m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); - } - } - } - - IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { - if (!Payload) - { - BadHashes.push_back(Hash); - return; - } - ++ChunkCount; - ChunkBytes += Payload.GetSize(); - - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize)) - { - if (RawHash != Hash) - { - // Hash mismatch - BadHashes.push_back(Hash); - return; - } - return; - } -#if ZEN_WITH_TESTS - IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload)))); - if (ComputedHash == Hash) - { - return; - } -#endif - BadHashes.push_back(Hash); - }); - - Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - - if (!BadHashes.empty()) - { - ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size()); - - if (Ctx.RunRecovery()) - { - ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); - - for (const IoHash& Hash : BadHashes) - { - std::error_code Ec; - DeleteChunk(Hash, Ec); - - if (Ec) - { - ZEN_WARN("failed to delete file for chunk {}", Hash); - } - } - } - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadHashes); - - ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); -} - -void -FileCasStrategy::CollectGarbage(GcContext& GcCtx) -{ - ZEN_ASSERT(m_IsInitialized); - - ZEN_DEBUG("collecting garbage from {}", m_RootDirectory); - - std::vector<IoHash> ChunksToDelete; - std::atomic<uint64_t> ChunksToDeleteBytes{0}; - std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; - - std::vector<IoHash> CandidateCas; - CandidateCas.resize(1); - - uint64_t DeletedCount = 0; - uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", - m_RootDirectory, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - DeletedCount, - ChunkCount, - NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)), - NiceBytes(OldTotalSize)); - }); - - IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { - bool KeepThis = false; - CandidateCas[0] = Hash; - GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { - ZEN_UNUSED(Hash); - KeepThis = true; - }); - - const uint64_t FileSize = Payload.GetSize(); - - if (!KeepThis) - { - ChunksToDelete.push_back(Hash); - ChunksToDeleteBytes.fetch_add(FileSize); - } - - ++ChunkCount; - ChunkBytes.fetch_add(FileSize); - }); - - // TODO, any entires we did not encounter during our IterateChunks should be removed from the index - - if (ChunksToDelete.empty()) - { - ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); - return; - } - - ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})", - m_RootDirectory, - ChunksToDelete.size(), - ChunkCount.load(), - NiceBytes(ChunksToDeleteBytes)); - - if (GcCtx.IsDeletionMode() == false) - { - ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled"); - - return; - } - - for (const IoHash& Hash : ChunksToDelete) - { - ZEN_TRACE("deleting chunk {}", Hash); - - std::error_code Ec; - DeleteChunk(Hash, Ec); - - if (Ec) - { - ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message()); - continue; - } - DeletedCount++; - } - - GcCtx.AddDeletedCids(ChunksToDelete); -} - -bool -FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason) -{ - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if (Entry.Flags & (~FileCasIndexEntry::kTombStone)) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); - return false; - } - if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) - { - return true; - } - uint64_t Size = Entry.Size; - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; -} - -void -FileCasStrategy::MakeIndexSnapshot() -{ - using namespace filecas::impl; - - uint64_t LogCount = m_CasLog.GetLogCount(); - if (m_LogFlushPosition == LogCount) - { - return; - } - ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_RootDirectory, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_RootDirectory); - fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } - - try - { - // Write the current state of the location map to a new index state - std::vector<FileCasIndexEntry> Entries; - - { - Entries.resize(m_Index.size()); - - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - FileCasIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Size = Entry.second.Size; - } - } - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount}; - - Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header); - - ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(IndexPath); - fs::rename(STmpIndexPath, IndexPath); - } - } - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } -} -uint64_t -FileCasStrategy::ReadIndexFile() -{ - using namespace filecas::impl; - - std::vector<FileCasIndexEntry> Entries; - std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); - if (std::filesystem::is_regular_file(IndexPath)) - { - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - Entries.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(FileCasIndexHeader)) - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry); - FileCasIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) && - (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount)) - { - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader)); - - std::string InvalidEntryReason; - for (const FileCasIndexEntry& Entry : Entries) - { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); - } - - return Header.LogPosition; - } - else - { - ZEN_WARN("skipping invalid index file '{}'", IndexPath); - } - } - return 0; - } - - if (std::filesystem::is_directory(m_RootDirectory)) - { - ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory); - TCasLogFile<FileCasIndexEntry> CasLog; - uint64_t TotalSize = 0; - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}", - m_RootDirectory, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - CasLog.GetLogCount(), - NiceBytes(TotalSize)); - }); - - std::filesystem::path LogPath = GetLogPath(m_RootDirectory); - - std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); - CasLog.Open(LogPath, CasLogFile::Mode::kTruncate); - std::string InvalidEntryReason; - for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) - { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason); - continue; - } - m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); - CasLog.Append(Entry); - } - - CasLog.Close(); - } - - return 0; -} - -uint64_t -FileCasStrategy::ReadLog(uint64_t SkipEntryCount) -{ - using namespace filecas::impl; - - std::filesystem::path LogPath = GetLogPath(m_RootDirectory); - if (std::filesystem::is_regular_file(LogPath)) - { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile<FileCasIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - m_Index.reserve(LogEntryCount); - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const FileCasIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Flags & FileCasIndexEntry::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size}); - }, - SkipEntryCount); - if (InvalidEntryCount) - { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath); - } - return LogEntryCount; - } - } - return 0; -} - -std::vector<FileCasStrategy::FileCasIndexEntry> -FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) -{ - using namespace filecas::impl; - - std::vector<FileCasIndexEntry> Entries; - struct Visitor : public FileSystemTraversal::TreeVisitor - { - Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {} - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override - { - std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); - - std::filesystem::path::string_type PathString = RelPath.native(); - - if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) - { - if (PathString.at(3) == std::filesystem::path::preferred_separator) - { - PathString.erase(3, 1); - } - PathString.append(File); - - // TODO: should validate that we're actually dealing with a valid hex string here -#if ZEN_PLATFORM_WINDOWS - StringBuilder<64> Utf8; - WideToUtf8(PathString, Utf8); - IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); -#else - IoHash NameHash = IoHash::FromHexString(PathString); -#endif - Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize}); - } - } - - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& DirectoryName) override - { - return true; - } - - const std::filesystem::path& RootDirectory; - std::vector<FileCasIndexEntry>& Entries; - } CasVisitor{RootDir, Entries}; - - FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(RootDir, CasVisitor); - return Entries; -}; - - ////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -TEST_CASE("cas.file.move") -{ - // specifying an absolute path here can be helpful when using procmon to dig into things - ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - - GcManager Gc; - - FileCasStrategy FileCas(Gc); - FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); - - { - std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; - - IoBuffer ZeroBytes{1024 * 1024}; - IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes); - - BasicFile PayloadFile; - PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate); - PayloadFile.Write(ZeroBytes, 0); - PayloadFile.Close(); - - IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); - - CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash); - CHECK_EQ(Result.New, true); - } - -# if 0 - SUBCASE("stresstest") - { - std::vector<IoHash> PayloadHashes; - - const int kWorkers = 64; - const int kItemCount = 128; - - for (int w = 0; w < kWorkers; ++w) - { - for (int i = 0; i < kItemCount; ++i) - { - IoBuffer Payload{1024}; - *reinterpret_cast<int*>(Payload.MutableData()) = i; - PayloadHashes.push_back(IoHash::HashBuffer(Payload)); - - std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; - WriteFile(PayloadPath, Payload); - } - } - - std::barrier Sync{kWorkers}; - - auto PopulateAll = [&](int w) { - std::vector<IoBuffer> Buffers; - - for (int i = 0; i < kItemCount; ++i) - { - std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; - IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath); - Buffers.push_back(Payload); - Sync.arrive_and_wait(); - CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]); - } - }; - - std::vector<std::jthread> Threads; - - for (int i = 0; i < kWorkers; ++i) - { - Threads.push_back(std::jthread(PopulateAll, i)); - } - - for (std::jthread& Thread : Threads) - { - Thread.join(); - } - } -# endif -} - -TEST_CASE("cas.file.gc") -{ - // specifying an absolute path here can be helpful when using procmon to dig into things - ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - - GcManager Gc; - FileCasStrategy FileCas(Gc); - FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); - - const int kIterationCount = 1000; - std::vector<IoHash> Keys{kIterationCount}; - - auto InsertChunks = [&] { - for (int i = 0; i < kIterationCount; ++i) - { - CbObjectWriter Cbo; - Cbo << "id" << i; - CbObject Obj = Cbo.Save(); - - IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); - IoHash Hash = HashBuffer(ObjBuffer); - - FileCas.InsertChunk(ObjBuffer, Hash); - - Keys[i] = Hash; - } - }; - - // Drop everything - - { - InsertChunks(); - - GcContext Ctx(GcClock::Now() - std::chrono::hours(24)); - FileCas.CollectGarbage(Ctx); - - for (const IoHash& Key : Keys) - { - IoBuffer Chunk = FileCas.FindChunk(Key); - - CHECK(!Chunk); - } - } - - // Keep roughly half of the chunks - - { - InsertChunks(); - - GcContext Ctx(GcClock::Now() - std::chrono::hours(24)); - - for (const IoHash& Key : Keys) - { - if (Key.Hash[0] & 1) - { - Ctx.AddRetainedCids(std::vector<IoHash>{Key}); - } - } - - FileCas.CollectGarbage(Ctx); - - for (const IoHash& Key : Keys) - { - if (Key.Hash[0] & 1) - { - CHECK(FileCas.FindChunk(Key)); - } - else - { - CHECK(!FileCas.FindChunk(Key)); - } - } - } -} - -#endif - -void -filecas_forcelink() -{ -} - -} // namespace zen diff --git a/zenstore/filecas.h b/zenstore/filecas.h deleted file mode 100644 index 420b3a634..000000000 --- a/zenstore/filecas.h +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/zencore.h> - -#include <zencore/filesystem.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/thread.h> -#include <zenstore/caslog.h> -#include <zenstore/gc.h> - -#include "cas.h" - -#include <atomic> -#include <functional> - -namespace spdlog { -class logger; -} - -namespace zen { - -class BasicFile; - -/** CAS storage strategy using a file-per-chunk storage strategy - */ - -struct FileCasStrategy final : public GcStorage -{ - FileCasStrategy(GcManager& Gc); - ~FileCasStrategy(); - - void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore); - CasStore::InsertResult InsertChunk(IoBuffer Chunk, - const IoHash& ChunkHash, - CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace); - IoBuffer FindChunk(const IoHash& ChunkHash); - bool HaveChunk(const IoHash& ChunkHash); - void FilterChunks(HashKeySet& InOutChunks); - void Flush(); - void Scrub(ScrubContext& Ctx); - virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } - -private: - void MakeIndexSnapshot(); - uint64_t ReadIndexFile(); - uint64_t ReadLog(uint64_t LogPosition); - - struct IndexEntry - { - uint64_t Size = 0; - }; - using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>; - - CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); - - std::filesystem::path m_RootDirectory; - RwLock m_Lock; - IndexMap m_Index; - RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines - spdlog::logger& m_Log; - spdlog::logger& Log() { return m_Log; } - std::atomic_uint64_t m_TotalSize{}; - bool m_IsInitialized = false; - - struct FileCasIndexEntry - { - static const uint32_t kTombStone = 0x0000'0001; - - bool IsFlagSet(const uint32_t Flag) const { return (Flags & kTombStone) == Flag; } - - IoHash Key; - uint32_t Flags = 0; - uint64_t Size = 0; - }; - static bool ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason); - static std::vector<FileCasStrategy::FileCasIndexEntry> ScanFolderForCasFiles(const std::filesystem::path& RootDir); - - static_assert(sizeof(FileCasIndexEntry) == 32); - - TCasLogFile<FileCasIndexEntry> m_CasLog; - uint64_t m_LogFlushPosition = 0; - - inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } - void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback); - void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec); - - struct ShardingHelper - { - ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash); - - size_t Shard2len = 0; - ExtendablePathBuilder<128> ShardedPath; - }; -}; - -void filecas_forcelink(); - -} // namespace zen diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp deleted file mode 100644 index 370c3c965..000000000 --- a/zenstore/gc.cpp +++ /dev/null @@ -1,1312 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenstore/gc.h> - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> -#include <zencore/timer.h> -#include <zenstore/cidstore.h> - -#include "cas.h" - -#include <fmt/format.h> -#include <filesystem> - -#if ZEN_PLATFORM_WINDOWS -# include <zencore/windows.h> -#else -# include <fcntl.h> -# include <sys/file.h> -# include <sys/stat.h> -# include <unistd.h> -#endif - -#if ZEN_WITH_TESTS -# include <zencore/compress.h> -# include <algorithm> -# include <random> -#endif - -template<> -struct fmt::formatter<zen::GcClock::TimePoint> : formatter<string_view> -{ - template<typename FormatContext> - auto format(const zen::GcClock::TimePoint& TimePoint, FormatContext& ctx) - { - std::time_t Time = std::chrono::system_clock::to_time_t(TimePoint); - zen::ExtendableStringBuilder<32> String; - String << std::ctime(&Time); - return formatter<string_view>::format(String.ToView(), ctx); - } -}; - -namespace zen { - -using namespace std::literals; -namespace fs = std::filesystem; - -////////////////////////////////////////////////////////////////////////// - -namespace { - std::error_code CreateGCReserve(const std::filesystem::path& Path, uint64_t Size) - { - if (Size == 0) - { - std::filesystem::remove(Path); - return std::error_code{}; - } - CreateDirectories(Path.parent_path()); - if (std::filesystem::is_regular_file(Path) && std::filesystem::file_size(Path) == Size) - { - return std::error_code(); - } -#if ZEN_PLATFORM_WINDOWS - DWORD dwCreationDisposition = CREATE_ALWAYS; - DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; - - const DWORD dwShareMode = 0; - const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; - HANDLE hTemplateFile = nullptr; - - HANDLE FileHandle = CreateFile(Path.c_str(), - dwDesiredAccess, - dwShareMode, - /* lpSecurityAttributes */ nullptr, - dwCreationDisposition, - dwFlagsAndAttributes, - hTemplateFile); - - if (FileHandle == INVALID_HANDLE_VALUE) - { - return MakeErrorCodeFromLastError(); - } - bool Keep = true; - auto _ = MakeGuard([&]() { - ::CloseHandle(FileHandle); - if (!Keep) - { - ::DeleteFile(Path.c_str()); - } - }); - LARGE_INTEGER liFileSize; - liFileSize.QuadPart = Size; - BOOL OK = ::SetFilePointerEx(FileHandle, liFileSize, 0, FILE_BEGIN); - if (!OK) - { - return MakeErrorCodeFromLastError(); - } - OK = ::SetEndOfFile(FileHandle); - if (!OK) - { - return MakeErrorCodeFromLastError(); - } - Keep = true; -#else - int OpenFlags = O_CLOEXEC | O_RDWR | O_CREAT; - int Fd = open(Path.c_str(), OpenFlags, 0666); - if (Fd < 0) - { - return MakeErrorCodeFromLastError(); - } - - bool Keep = true; - auto _ = MakeGuard([&]() { - close(Fd); - if (!Keep) - { - unlink(Path.c_str()); - } - }); - - if (fchmod(Fd, 0666) < 0) - { - return MakeErrorCodeFromLastError(); - } - -# if ZEN_PLATFORM_MAC - if (ftruncate(Fd, (off_t)Size) < 0) - { - return MakeErrorCodeFromLastError(); - } -# else - if (ftruncate64(Fd, (off64_t)Size) < 0) - { - return MakeErrorCodeFromLastError(); - } - int Error = posix_fallocate64(Fd, 0, (off64_t)Size); - if (Error) - { - return MakeErrorCode(Error); - } -# endif - Keep = true; -#endif - return std::error_code{}; - } - -} // namespace - -////////////////////////////////////////////////////////////////////////// - -CbObject -LoadCompactBinaryObject(const fs::path& Path) -{ - FileContents Result = ReadFile(Path); - - if (!Result.ErrorCode) - { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) - { - return LoadCompactBinaryObject(Buffer); - } - } - - return CbObject(); -} - -void -SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) -{ - WriteFile(Path, Object.GetBuffer().AsIoBuffer()); -} - -////////////////////////////////////////////////////////////////////////// - -struct GcContext::GcState -{ - using CacheKeyContexts = std::unordered_map<std::string, std::vector<IoHash>>; - - CacheKeyContexts m_ExpiredCacheKeys; - HashKeySet m_RetainedCids; - HashKeySet m_DeletedCids; - GcClock::TimePoint m_ExpireTime; - bool m_DeletionMode = true; - bool m_CollectSmallObjects = false; - - std::filesystem::path DiskReservePath; -}; - -GcContext::GcContext(const GcClock::TimePoint& ExpireTime) : m_State(std::make_unique<GcState>()) -{ - m_State->m_ExpireTime = ExpireTime; -} - -GcContext::~GcContext() -{ -} - -void -GcContext::AddRetainedCids(std::span<const IoHash> Cids) -{ - m_State->m_RetainedCids.AddHashesToSet(Cids); -} - -void -GcContext::SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys) -{ - m_State->m_ExpiredCacheKeys[CacheKeyContext] = std::move(ExpiredKeys); -} - -void -GcContext::IterateCids(std::function<void(const IoHash&)> Callback) -{ - m_State->m_RetainedCids.IterateHashes([&](const IoHash& Hash) { Callback(Hash); }); -} - -void -GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc) -{ - m_State->m_RetainedCids.FilterHashes(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); -} - -void -GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc) -{ - m_State->m_RetainedCids.FilterHashes(Cid, std::move(FilterFunc)); -} - -void -GcContext::AddDeletedCids(std::span<const IoHash> Cas) -{ - m_State->m_DeletedCids.AddHashesToSet(Cas); -} - -const HashKeySet& -GcContext::DeletedCids() -{ - return m_State->m_DeletedCids; -} - -std::span<const IoHash> -GcContext::ExpiredCacheKeys(const std::string& CacheKeyContext) const -{ - return m_State->m_ExpiredCacheKeys[CacheKeyContext]; -} - -bool -GcContext::IsDeletionMode() const -{ - return m_State->m_DeletionMode; -} - -void -GcContext::SetDeletionMode(bool NewState) -{ - m_State->m_DeletionMode = NewState; -} - -bool -GcContext::CollectSmallObjects() const -{ - return m_State->m_CollectSmallObjects; -} - -void -GcContext::CollectSmallObjects(bool NewState) -{ - m_State->m_CollectSmallObjects = NewState; -} - -GcClock::TimePoint -GcContext::ExpireTime() const -{ - return m_State->m_ExpireTime; -} - -void -GcContext::DiskReservePath(const std::filesystem::path& Path) -{ - m_State->DiskReservePath = Path; -} - -uint64_t -GcContext::ClaimGCReserve() -{ - if (!std::filesystem::is_regular_file(m_State->DiskReservePath)) - { - return 0; - } - uint64_t ReclaimedSize = std::filesystem::file_size(m_State->DiskReservePath); - if (std::filesystem::remove(m_State->DiskReservePath)) - { - return ReclaimedSize; - } - return 0; -} - -////////////////////////////////////////////////////////////////////////// - -GcContributor::GcContributor(GcManager& Gc) : m_Gc(Gc) -{ - m_Gc.AddGcContributor(this); -} - -GcContributor::~GcContributor() -{ - m_Gc.RemoveGcContributor(this); -} - -////////////////////////////////////////////////////////////////////////// - -GcStorage::GcStorage(GcManager& Gc) : m_Gc(Gc) -{ - m_Gc.AddGcStorage(this); -} - -GcStorage::~GcStorage() -{ - m_Gc.RemoveGcStorage(this); -} - -////////////////////////////////////////////////////////////////////////// - -GcManager::GcManager() : m_Log(logging::Get("gc")) -{ -} - -GcManager::~GcManager() -{ -} - -void -GcManager::AddGcContributor(GcContributor* Contributor) -{ - RwLock::ExclusiveLockScope _(m_Lock); - m_GcContribs.push_back(Contributor); -} - -void -GcManager::RemoveGcContributor(GcContributor* Contributor) -{ - RwLock::ExclusiveLockScope _(m_Lock); - std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; }); -} - -void -GcManager::AddGcStorage(GcStorage* Storage) -{ - ZEN_ASSERT(Storage != nullptr); - RwLock::ExclusiveLockScope _(m_Lock); - m_GcStorage.push_back(Storage); -} - -void -GcManager::RemoveGcStorage(GcStorage* Storage) -{ - RwLock::ExclusiveLockScope _(m_Lock); - std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; }); -} - -void -GcManager::CollectGarbage(GcContext& GcCtx) -{ - RwLock::SharedLockScope _(m_Lock); - - // First gather reference set - { - Stopwatch Timer; - const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (GcContributor* Contributor : m_GcContribs) - { - Contributor->GatherReferences(GcCtx); - } - } - - // Then trim storage - { - GcStorageSize GCTotalSizeDiff; - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - ZEN_INFO("collected garbage in {}. Removed {} disk space, {} memory", - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(GCTotalSizeDiff.DiskSize), - NiceBytes(GCTotalSizeDiff.MemorySize)); - }); - for (GcStorage* Storage : m_GcStorage) - { - const auto PreSize = Storage->StorageSize(); - Storage->CollectGarbage(GcCtx); - const auto PostSize = Storage->StorageSize(); - GCTotalSizeDiff.DiskSize += PreSize.DiskSize > PostSize.DiskSize ? PreSize.DiskSize - PostSize.DiskSize : 0; - GCTotalSizeDiff.MemorySize += PreSize.MemorySize > PostSize.MemorySize ? PreSize.MemorySize - PostSize.MemorySize : 0; - } - } -} - -GcStorageSize -GcManager::TotalStorageSize() const -{ - RwLock::SharedLockScope _(m_Lock); - - GcStorageSize TotalSize; - - for (GcStorage* Storage : m_GcStorage) - { - const auto Size = Storage->StorageSize(); - TotalSize.DiskSize += Size.DiskSize; - TotalSize.MemorySize += Size.MemorySize; - } - - return TotalSize; -} - -#if ZEN_USE_REF_TRACKING -void -GcManager::OnNewCidReferences(std::span<IoHash> Hashes) -{ - ZEN_UNUSED(Hashes); -} - -void -GcManager::OnCommittedCidReferences(std::span<IoHash> Hashes) -{ - ZEN_UNUSED(Hashes); -} - -void -GcManager::OnDroppedCidReferences(std::span<IoHash> Hashes) -{ - ZEN_UNUSED(Hashes); -} -#endif - -////////////////////////////////////////////////////////////////////////// -void -DiskUsageWindow::KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick) -{ - auto It = m_LogWindow.begin(); - if (It == m_LogWindow.end()) - { - return; - } - while (It->SampleTime < StartTick) - { - ++It; - if (It == m_LogWindow.end()) - { - m_LogWindow.clear(); - return; - } - } - m_LogWindow.erase(m_LogWindow.begin(), It); - - It = m_LogWindow.begin(); - while (It != m_LogWindow.end()) - { - if (It->SampleTime >= EndTick) - { - m_LogWindow.erase(It, m_LogWindow.end()); - return; - } - It++; - } -} - -std::vector<uint64_t> -DiskUsageWindow::GetDiskDeltas(GcClock::Tick StartTick, GcClock::Tick EndTick, GcClock::Tick DeltaWidth, uint64_t& OutMaxDelta) const -{ - ZEN_ASSERT(StartTick != -1); - ZEN_ASSERT(DeltaWidth > 0); - - std::vector<uint64_t> Result; - Result.reserve((EndTick - StartTick + DeltaWidth - 1) / DeltaWidth); - - size_t WindowSize = m_LogWindow.size(); - GcClock::Tick FirstWindowTick = WindowSize < 2 ? EndTick : m_LogWindow[1].SampleTime; - - GcClock::Tick RangeStart = StartTick; - while (FirstWindowTick >= RangeStart + DeltaWidth && RangeStart < EndTick) - { - Result.push_back(0); - RangeStart += DeltaWidth; - } - - uint64_t DeltaSum = 0; - size_t WindowIndex = 1; - while (WindowIndex < WindowSize && RangeStart < EndTick) - { - const DiskUsageEntry& Entry = m_LogWindow[WindowIndex]; - if (Entry.SampleTime < RangeStart) - { - ++WindowIndex; - continue; - } - GcClock::Tick RangeEnd = Min(EndTick, RangeStart + DeltaWidth); - ZEN_ASSERT(Entry.SampleTime >= RangeStart); - if (Entry.SampleTime >= RangeEnd) - { - Result.push_back(DeltaSum); - OutMaxDelta = Max(DeltaSum, OutMaxDelta); - DeltaSum = 0; - RangeStart = RangeEnd; - continue; - } - const DiskUsageEntry& PrevEntry = m_LogWindow[WindowIndex - 1]; - if (Entry.DiskUsage > PrevEntry.DiskUsage) - { - uint64_t Delta = Entry.DiskUsage - PrevEntry.DiskUsage; - DeltaSum += Delta; - } - WindowIndex++; - } - - while (RangeStart < EndTick) - { - Result.push_back(DeltaSum); - OutMaxDelta = Max(DeltaSum, OutMaxDelta); - DeltaSum = 0; - RangeStart += DeltaWidth; - } - return Result; -} - -GcClock::Tick -DiskUsageWindow::FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const -{ - ZEN_ASSERT(Amount > 0); - uint64_t RemainingToFind = Amount; - size_t Offset = 1; - while (Offset < m_LogWindow.size()) - { - const DiskUsageEntry& Entry = m_LogWindow[Offset]; - if (Entry.SampleTime >= EndTick) - { - return EndTick; - } - const DiskUsageEntry& PreviousEntry = m_LogWindow[Offset - 1]; - uint64_t Delta = Entry.DiskUsage > PreviousEntry.DiskUsage ? Entry.DiskUsage - PreviousEntry.DiskUsage : 0; - if (Delta >= RemainingToFind) - { - return m_LogWindow[Offset].SampleTime + 1; - } - RemainingToFind -= Delta; - Offset++; - } - return EndTick; -} - -////////////////////////////////////////////////////////////////////////// - -GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager) -{ -} - -GcScheduler::~GcScheduler() -{ - Shutdown(); -} - -void -GcScheduler::Initialize(const GcSchedulerConfig& Config) -{ - using namespace std::chrono; - - m_Config = Config; - - if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval) - { - m_Config.Interval = m_Config.MonitorInterval; - } - - std::filesystem::create_directories(Config.RootDirectory); - - std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); - if (Ec) - { - ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'", - m_Config.RootDirectory / "reserve.gc", - NiceBytes(m_Config.DiskReserveSize), - Ec.message()); - } - - m_LastGcTime = GcClock::Now(); - m_LastGcExpireTime = GcClock::TimePoint::min(); - - if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state")) - { - m_LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64())); - m_LastGcExpireTime = - GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcExpireTime"].AsInt64(GcClock::Duration::min().count()))); - if (m_LastGcTime + m_Config.Interval < GcClock::Now()) - { - // TODO: Trigger GC? - m_LastGcTime = GcClock::Now(); - } - } - - m_DiskUsageLog.Open(m_Config.RootDirectory / "gc.dlog", CasLogFile::Mode::kWrite); - m_DiskUsageLog.Initialize(); - const GcClock::Tick LastGCTick = m_LastGcTime.time_since_epoch().count(); - m_DiskUsageLog.Replay( - [this, LastGCTick](const DiskUsageWindow::DiskUsageEntry& Entry) { - if (Entry.SampleTime >= m_LastGcExpireTime.time_since_epoch().count()) - { - m_DiskUsageWindow.Append(Entry); - } - }, - 0); - - m_NextGcTime = NextGcTime(m_LastGcTime); - m_GcThread = std::thread(&GcScheduler::SchedulerThread, this); -} - -void -GcScheduler::Shutdown() -{ - if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status) - { - bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning); - m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped); - m_GcSignal.notify_one(); - - if (m_GcThread.joinable()) - { - if (GcIsRunning) - { - ZEN_INFO("Waiting for garbage collection to complete"); - } - m_GcThread.join(); - } - } - m_DiskUsageLog.Flush(); - m_DiskUsageLog.Close(); -} - -bool -GcScheduler::Trigger(const GcScheduler::TriggerParams& Params) -{ - if (m_Config.Enabled) - { - std::unique_lock Lock(m_GcMutex); - if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) - { - m_TriggerParams = Params; - uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); - if (m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning))) - { - m_GcSignal.notify_one(); - return true; - } - } - } - - return false; -} - -void -GcScheduler::SchedulerThread() -{ - std::chrono::seconds WaitTime{0}; - - for (;;) - { - bool Timeout = false; - { - ZEN_ASSERT(WaitTime.count() >= 0); - std::unique_lock Lock(m_GcMutex); - Timeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, WaitTime); - } - - if (Status() == GcSchedulerStatus::kStopped) - { - break; - } - - if (!m_Config.Enabled) - { - WaitTime = std::chrono::seconds::max(); - continue; - } - - if (!Timeout && Status() == GcSchedulerStatus::kIdle) - { - continue; - } - - bool Delete = true; - bool CollectSmallObjects = m_Config.CollectSmallObjects; - std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration; - uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; - GcClock::TimePoint Now = GcClock::Now(); - if (m_TriggerParams) - { - const auto TriggerParams = m_TriggerParams.value(); - m_TriggerParams.reset(); - - CollectSmallObjects = TriggerParams.CollectSmallObjects; - if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max()) - { - MaxCacheDuration = TriggerParams.MaxCacheDuration; - } - if (TriggerParams.DiskSizeSoftLimit != 0) - { - DiskSizeSoftLimit = TriggerParams.DiskSizeSoftLimit; - } - } - - GcClock::TimePoint ExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; - - std::error_code Ec; - const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); - - if (Timeout && Status() == GcSchedulerStatus::kIdle) - { - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); - if (Ec) - { - ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message()); - } - - const int64_t PressureGraphLength = 30; - const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval; - std::vector<uint64_t> DiskDeltas; - uint64_t MaxLoad = 0; - { - const GcClock::Tick EpochTickCount = GcClock::Now().time_since_epoch().count(); - std::unique_lock Lock(m_GcMutex); - m_DiskUsageWindow.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize}); - m_DiskUsageLog.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize}); - const GcClock::TimePoint LoadGraphStartTime = Now - LoadGraphTime; - GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count(); - GcClock::Tick End = Now.time_since_epoch().count(); - DiskDeltas = m_DiskUsageWindow.GetDiskDeltas(Start, - End, - Max(1, (End - Start + PressureGraphLength - 1) / PressureGraphLength), - MaxLoad); - } - - std::string LoadGraph; - LoadGraph.resize(DiskDeltas.size(), '0'); - if (DiskDeltas.size() > 0 && MaxLoad > 0) - { - char LoadIndicator[11] = "0123456789"; - for (size_t Index = 0; Index < DiskDeltas.size(); ++Index) - { - size_t LoadIndex = (9 * DiskDeltas[Index] + MaxLoad - 1) / MaxLoad; - LoadGraph[Index] = LoadIndicator[LoadIndex]; - } - } - - uint64_t GcDiskSpaceGoal = 0; - if (DiskSizeSoftLimit != 0 && TotalSize.DiskSize > DiskSizeSoftLimit) - { - GcDiskSpaceGoal = TotalSize.DiskSize - DiskSizeSoftLimit; - std::unique_lock Lock(m_GcMutex); - GcClock::Tick AgeTick = m_DiskUsageWindow.FindTimepointThatRemoves(GcDiskSpaceGoal, Now.time_since_epoch().count()); - GcClock::TimePoint SizeBasedExpireTime = GcClock::TimePointFromTick(AgeTick); - if (SizeBasedExpireTime > ExpireTime) - { - ExpireTime = SizeBasedExpireTime; - } - } - - bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0; - - std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now()); - - if (RemaingTime < std::chrono::seconds::zero()) - { - RemaingTime = std::chrono::seconds::zero(); - } - - bool TimeBasedGCTriggered = !DiskSpaceGCTriggered && RemaingTime.count() == 0; - ZEN_INFO( - "{} in use,{} {} of total {} free disk space, disk writes last {} per {} [{}], peak {}/s. {}", - NiceBytes(TotalSize.DiskSize), - DiskSizeSoftLimit == 0 ? "" : fmt::format(" {} soft limit,", NiceBytes(DiskSizeSoftLimit)), - NiceBytes(Space.Free), - NiceBytes(Space.Total), - NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count())), - NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count() / PressureGraphLength)), - LoadGraph, - NiceBytes(MaxLoad * uint64_t(std::chrono::seconds(1).count()) / uint64_t(std::chrono::seconds(LoadGraphTime).count())), - DiskSpaceGCTriggered ? fmt::format("Disk use threshold triggered, trying to reclaim {}. ", NiceBytes(GcDiskSpaceGoal)) - : TimeBasedGCTriggered ? "GC schedule triggered." - : m_NextGcTime == GcClock::TimePoint::max() - ? "" - : fmt::format("{} until next scheduled GC.", NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTime).count())))); - - if (!DiskSpaceGCTriggered && !TimeBasedGCTriggered) - { - WaitTime = m_Config.MonitorInterval < RemaingTime ? m_Config.MonitorInterval : RemaingTime; - continue; - } - - WaitTime = m_Config.MonitorInterval; - uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); - if (!m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning))) - { - continue; - } - } - - CollectGarbage(ExpireTime, Delete, CollectSmallObjects); - - uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); - if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) - { - ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped)); - break; - } - - WaitTime = m_Config.MonitorInterval; - } -} - -GcClock::TimePoint -GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime) -{ - if (m_Config.Interval.count()) - { - return CurrentTime + m_Config.Interval; - } - else - { - return GcClock::TimePoint::max(); - } -} - -void -GcScheduler::CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects) -{ - GcContext GcCtx(ExpireTime); - GcCtx.SetDeletionMode(Delete); - GcCtx.CollectSmallObjects(CollectSmallObjects); - // GcCtx.MaxCacheDuration(MaxCacheDuration); - GcCtx.DiskReservePath(m_Config.RootDirectory / "reserve.gc"); - - ZEN_INFO("garbage collection STARTING, small objects gc {}, cutoff time {}", - GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv, - ExpireTime); - { - Stopwatch Timer; - const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - m_GcManager.CollectGarbage(GcCtx); - - if (Delete) - { - m_LastGcExpireTime = ExpireTime; - std::unique_lock Lock(m_GcMutex); - m_DiskUsageWindow.KeepRange(ExpireTime.time_since_epoch().count(), GcClock::Duration::max().count()); - } - - m_LastGcTime = GcClock::Now(); - m_NextGcTime = NextGcTime(m_LastGcTime); - - { - const fs::path Path = m_Config.RootDirectory / "gc_state"; - ZEN_DEBUG("saving scheduler state to '{}'", Path); - CbObjectWriter SchedulerState; - SchedulerState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count()); - SchedulerState << "LastGcExpireTime"sv << static_cast<int64_t>(m_LastGcExpireTime.time_since_epoch().count()); - SaveCompactBinaryObject(Path, SchedulerState.Save()); - } - - std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); - if (Ec) - { - ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason: '{}'", - m_Config.RootDirectory / "reserve.gc", - NiceBytes(m_Config.DiskReserveSize), - Ec.message()); - } - } -} - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -namespace gc::impl { - static IoBuffer CreateChunk(uint64_t Size) - { - static std::random_device rd; - static std::mt19937 g(rd()); - - std::vector<uint8_t> Values; - Values.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) - { - Values[Idx] = static_cast<uint8_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); - } - - static CompressedBuffer Compress(IoBuffer Buffer) - { - return CompressedBuffer::Compress(SharedBuffer::MakeView(Buffer.GetData(), Buffer.GetSize())); - } -} // namespace gc::impl - -TEST_CASE("gc.basic") -{ - using namespace gc::impl; - - ScopedTemporaryDirectory TempDir; - - CidStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path() / "cas"; - - GcManager Gc; - CidStore CidStore(Gc); - - CidStore.Initialize(CasConfig); - - IoBuffer Chunk = CreateChunk(128); - auto CompressedChunk = Compress(Chunk); - - const auto InsertResult = CidStore.AddChunk(CompressedChunk.GetCompressed().Flatten().AsIoBuffer(), CompressedChunk.DecodeRawHash()); - CHECK(InsertResult.New); - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - - CidStore.Flush(); - Gc.CollectGarbage(GcCtx); - - CHECK(!CidStore.ContainsChunk(CompressedChunk.DecodeRawHash())); -} - -TEST_CASE("gc.full") -{ - using namespace gc::impl; - - ScopedTemporaryDirectory TempDir; - - CidStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path() / "cas"; - - GcManager Gc; - std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc); - - CasStore->Initialize(CasConfig); - - uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; - IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]), - CreateChunk(ChunkSizes[1]), - CreateChunk(ChunkSizes[2]), - CreateChunk(ChunkSizes[3]), - CreateChunk(ChunkSizes[4]), - CreateChunk(ChunkSizes[5]), - CreateChunk(ChunkSizes[6]), - CreateChunk(ChunkSizes[7]), - CreateChunk(ChunkSizes[8])}; - IoHash ChunkHashes[9] = { - IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()), - IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()), - IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()), - IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()), - IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()), - IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()), - IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()), - IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()), - IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()), - }; - - CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); - CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); - CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); - CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); - CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); - CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); - CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); - CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); - CasStore->InsertChunk(Chunks[8], ChunkHashes[8]); - - CidStoreSize InitialSize = CasStore->TotalSize(); - - // Keep first and last - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[0]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - CasStore->Flush(); - Gc.CollectGarbage(GcCtx); - - CHECK(CasStore->ContainsChunk(ChunkHashes[0])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[7])); - CHECK(CasStore->ContainsChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); - } - - CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); - CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); - CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); - CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); - CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); - CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); - CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); - - // Keep last - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - CasStore->Flush(); - Gc.CollectGarbage(GcCtx); - - CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[7])); - CHECK(CasStore->ContainsChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); - - CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); - CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); - CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); - CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); - CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); - CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); - CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Keep mixed - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[1]); - KeepChunks.push_back(ChunkHashes[4]); - KeepChunks.push_back(ChunkHashes[7]); - GcCtx.AddRetainedCids(KeepChunks); - - CasStore->Flush(); - Gc.CollectGarbage(GcCtx); - - CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); - CHECK(CasStore->ContainsChunk(ChunkHashes[1])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); - CHECK(CasStore->ContainsChunk(ChunkHashes[4])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); - CHECK(CasStore->ContainsChunk(ChunkHashes[7])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); - - CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); - CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); - CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); - CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); - CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); - CasStore->InsertChunk(Chunks[8], ChunkHashes[8]); - } - - // Keep multiple at end - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[6]); - KeepChunks.push_back(ChunkHashes[7]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - CasStore->Flush(); - Gc.CollectGarbage(GcCtx); - - CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); - CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); - CHECK(CasStore->ContainsChunk(ChunkHashes[6])); - CHECK(CasStore->ContainsChunk(ChunkHashes[7])); - CHECK(CasStore->ContainsChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); - - CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); - CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); - CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); - CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); - CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); - CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); - } - - // Verify that we nicely appended blocks even after all GC operations - CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[3] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[3]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[5] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[5]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); - - auto FinalSize = CasStore->TotalSize(); - - CHECK_LE(InitialSize.TinySize, FinalSize.TinySize); - CHECK_GE(InitialSize.TinySize + (1u << 28), FinalSize.TinySize); -} - -TEST_CASE("gc.diskusagewindow") -{ - using namespace gc::impl; - - DiskUsageWindow Stats; - Stats.Append({.SampleTime = 0, .DiskUsage = 0}); // 0 0 - Stats.Append({.SampleTime = 10, .DiskUsage = 10}); // 1 10 - Stats.Append({.SampleTime = 20, .DiskUsage = 20}); // 2 10 - Stats.Append({.SampleTime = 30, .DiskUsage = 20}); // 3 0 - Stats.Append({.SampleTime = 40, .DiskUsage = 15}); // 4 0 - Stats.Append({.SampleTime = 50, .DiskUsage = 25}); // 5 10 - Stats.Append({.SampleTime = 60, .DiskUsage = 30}); // 6 5 - Stats.Append({.SampleTime = 70, .DiskUsage = 45}); // 7 15 - - SUBCASE("Truncate start") - { - Stats.KeepRange(-15, 31); - CHECK(Stats.m_LogWindow.size() == 4); - CHECK(Stats.m_LogWindow[0].SampleTime == 0); - CHECK(Stats.m_LogWindow[3].SampleTime == 30); - } - - SUBCASE("Truncate end") - { - Stats.KeepRange(70, 71); - CHECK(Stats.m_LogWindow.size() == 1); - CHECK(Stats.m_LogWindow[0].SampleTime == 70); - } - - SUBCASE("Truncate middle") - { - Stats.KeepRange(29, 69); - CHECK(Stats.m_LogWindow.size() == 4); - CHECK(Stats.m_LogWindow[0].SampleTime == 30); - CHECK(Stats.m_LogWindow[3].SampleTime == 60); - } - - SUBCASE("Full range") - { - uint64_t MaxDelta = 0; - // 0-10, 10-20, 20-30, 30-40, 40-50, 50-60, 60-70, 70-80 - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(0, 80, 10, MaxDelta); - CHECK(DiskDeltas.size() == 8); - CHECK(MaxDelta == 15); - CHECK(DiskDeltas[0] == 0); - CHECK(DiskDeltas[1] == 10); - CHECK(DiskDeltas[2] == 10); - CHECK(DiskDeltas[3] == 0); - CHECK(DiskDeltas[4] == 0); - CHECK(DiskDeltas[5] == 10); - CHECK(DiskDeltas[6] == 5); - CHECK(DiskDeltas[7] == 15); - } - - SUBCASE("Sub range") - { - uint64_t MaxDelta = 0; - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(20, 40, 10, MaxDelta); - CHECK(DiskDeltas.size() == 2); - CHECK(MaxDelta == 10); - CHECK(DiskDeltas[0] == 10); // [20:30] - CHECK(DiskDeltas[1] == 0); // [30:40] - } - SUBCASE("Unaligned sub range 1") - { - uint64_t MaxDelta = 0; - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(21, 51, 10, MaxDelta); - CHECK(DiskDeltas.size() == 3); - CHECK(MaxDelta == 10); - CHECK(DiskDeltas[0] == 0); // [21:31] - CHECK(DiskDeltas[1] == 0); // [31:41] - CHECK(DiskDeltas[2] == 10); // [41:51] - } - SUBCASE("Unaligned end range") - { - uint64_t MaxDelta = 0; - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(29, 79, 10, MaxDelta); - CHECK(DiskDeltas.size() == 5); - CHECK(MaxDelta == 15); - CHECK(DiskDeltas[0] == 0); // [29:39] - CHECK(DiskDeltas[1] == 0); // [39:49] - CHECK(DiskDeltas[2] == 10); // [49:59] - CHECK(DiskDeltas[3] == 5); // [59:69] - CHECK(DiskDeltas[4] == 15); // [69:79] - } - SUBCASE("Ahead of window") - { - uint64_t MaxDelta = 0; - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(-40, 0, 10, MaxDelta); - CHECK(DiskDeltas.size() == 4); - CHECK(MaxDelta == 0); - CHECK(DiskDeltas[0] == 0); // [-40:-30] - CHECK(DiskDeltas[1] == 0); // [-30:-20] - CHECK(DiskDeltas[2] == 0); // [-20:-10] - CHECK(DiskDeltas[3] == 0); // [-10:0] - } - SUBCASE("After of window") - { - uint64_t MaxDelta = 0; - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(90, 120, 10, MaxDelta); - CHECK(DiskDeltas.size() == 3); - CHECK(MaxDelta == 0); - CHECK(DiskDeltas[0] == 0); // [90:100] - CHECK(DiskDeltas[1] == 0); // [100:110] - CHECK(DiskDeltas[2] == 0); // [110:120] - } - SUBCASE("Encapsulating window") - { - uint64_t MaxDelta = 0; - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(-20, 100, 10, MaxDelta); - CHECK(DiskDeltas.size() == 12); - CHECK(MaxDelta == 15); - CHECK(DiskDeltas[0] == 0); // [-20:-10] - CHECK(DiskDeltas[1] == 0); // [ -10:0] - CHECK(DiskDeltas[2] == 0); // [0:10] - CHECK(DiskDeltas[3] == 10); // [10:20] - CHECK(DiskDeltas[4] == 10); // [20:30] - CHECK(DiskDeltas[5] == 0); // [30:40] - CHECK(DiskDeltas[6] == 0); // [40:50] - CHECK(DiskDeltas[7] == 10); // [50:60] - CHECK(DiskDeltas[8] == 5); // [60:70] - CHECK(DiskDeltas[9] == 15); // [70:80] - CHECK(DiskDeltas[10] == 0); // [80:90] - CHECK(DiskDeltas[11] == 0); // [90:100] - } - - SUBCASE("Full range half stride") - { - uint64_t MaxDelta = 0; - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(0, 80, 20, MaxDelta); - CHECK(DiskDeltas.size() == 4); - CHECK(MaxDelta == 20); - CHECK(DiskDeltas[0] == 10); // [0:20] - CHECK(DiskDeltas[1] == 10); // [20:40] - CHECK(DiskDeltas[2] == 10); // [40:60] - CHECK(DiskDeltas[3] == 20); // [60:80] - } - - SUBCASE("Partial odd stride") - { - uint64_t MaxDelta = 0; - std::vector<uint64_t> DiskDeltas = Stats.GetDiskDeltas(13, 67, 18, MaxDelta); - CHECK(DiskDeltas.size() == 3); - CHECK(MaxDelta == 15); - CHECK(DiskDeltas[0] == 10); // [13:31] - CHECK(DiskDeltas[1] == 0); // [31:49] - CHECK(DiskDeltas[2] == 15); // [49:67] - } - - SUBCASE("Find size window") - { - DiskUsageWindow Empty; - CHECK(Empty.FindTimepointThatRemoves(15u, 10000) == 10000); - - CHECK(Stats.FindTimepointThatRemoves(15u, 40) == 21); - CHECK(Stats.FindTimepointThatRemoves(15u, 20) == 20); - CHECK(Stats.FindTimepointThatRemoves(100000u, 50) == 50); - CHECK(Stats.FindTimepointThatRemoves(100000u, 1000)); - } -} -#endif - -void -gc_forcelink() -{ -} - -} // namespace zen diff --git a/zenstore/hashkeyset.cpp b/zenstore/hashkeyset.cpp deleted file mode 100644 index a5436f5cb..000000000 --- a/zenstore/hashkeyset.cpp +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenstore/hashkeyset.h> - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -void -HashKeySet::AddHashToSet(const IoHash& HashToAdd) -{ - m_HashSet.insert(HashToAdd); -} - -void -HashKeySet::AddHashesToSet(std::span<const IoHash> HashesToAdd) -{ - m_HashSet.insert(HashesToAdd.begin(), HashesToAdd.end()); -} - -void -HashKeySet::RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) -{ - for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd;) - { - if (Predicate(*It)) - { - It = m_HashSet.erase(It); - } - else - { - ++It; - } - } -} - -void -HashKeySet::IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const -{ - for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd; ++It) - { - Callback(*It); - } -} - -////////////////////////////////////////////////////////////////////////// -// -// Testing related code follows... -// - -#if ZEN_WITH_TESTS - -void -hashkeyset_forcelink() -{ -} - -#endif - -} // namespace zen diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h deleted file mode 100644 index 857ccae38..000000000 --- a/zenstore/include/zenstore/blockstore.h +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/filesystem.h> -#include <zencore/zencore.h> -#include <zenutil/basicfile.h> - -#include <unordered_map> -#include <unordered_set> - -namespace zen { - -////////////////////////////////////////////////////////////////////////// - -struct BlockStoreLocation -{ - uint32_t BlockIndex; - uint64_t Offset; - uint64_t Size; - - inline auto operator<=>(const BlockStoreLocation& Rhs) const = default; -}; - -#pragma pack(push) -#pragma pack(1) - -struct BlockStoreDiskLocation -{ - constexpr static uint32_t MaxBlockIndexBits = 20; - constexpr static uint32_t MaxOffsetBits = 28; - constexpr static uint32_t MaxBlockIndex = (1ul << BlockStoreDiskLocation::MaxBlockIndexBits) - 1ul; - constexpr static uint32_t MaxOffset = (1ul << BlockStoreDiskLocation::MaxOffsetBits) - 1ul; - - BlockStoreDiskLocation(const BlockStoreLocation& Location, uint64_t OffsetAlignment) - { - Init(Location.BlockIndex, Location.Offset / OffsetAlignment, Location.Size); - } - - BlockStoreDiskLocation() = default; - - inline BlockStoreLocation Get(uint64_t OffsetAlignment) const - { - uint64_t PackedOffset = 0; - memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); - return {.BlockIndex = static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits), - .Offset = (PackedOffset & MaxOffset) * OffsetAlignment, - .Size = GetSize()}; - } - - inline uint32_t GetBlockIndex() const - { - uint64_t PackedOffset = 0; - memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); - return static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits); - } - - inline uint64_t GetOffset(uint64_t OffsetAlignment) const - { - uint64_t PackedOffset = 0; - memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); - return (PackedOffset & MaxOffset) * OffsetAlignment; - } - - inline uint64_t GetSize() const { return m_Size; } - - inline auto operator<=>(const BlockStoreDiskLocation& Rhs) const = default; - -private: - inline void Init(uint32_t BlockIndex, uint64_t Offset, uint64_t Size) - { - ZEN_ASSERT(BlockIndex <= MaxBlockIndex); - ZEN_ASSERT(Offset <= MaxOffset); - ZEN_ASSERT(Size <= std::numeric_limits<std::uint32_t>::max()); - - m_Size = static_cast<uint32_t>(Size); - uint64_t PackedOffset = (static_cast<uint64_t>(BlockIndex) << MaxOffsetBits) + Offset; - memcpy(&m_Offset[0], &PackedOffset, sizeof m_Offset); - } - - uint32_t m_Size; - uint8_t m_Offset[6]; -}; - -#pragma pack(pop) - -struct BlockStoreFile : public RefCounted -{ - explicit BlockStoreFile(const std::filesystem::path& BlockPath); - ~BlockStoreFile(); - const std::filesystem::path& GetPath() const; - void Open(); - void Create(uint64_t InitialSize); - void MarkAsDeleteOnClose(); - uint64_t FileSize(); - IoBuffer GetChunk(uint64_t Offset, uint64_t Size); - void Read(void* Data, uint64_t Size, uint64_t FileOffset); - void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Flush(); - BasicFile& GetBasicFile(); - void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); - -private: - const std::filesystem::path m_Path; - IoBuffer m_IoBuffer; - BasicFile m_File; -}; - -class BlockStore -{ -public: - struct ReclaimSnapshotState - { - std::unordered_set<uint32_t> m_ActiveWriteBlocks; - size_t BlockCount; - }; - - typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray; - typedef std::vector<size_t> ChunkIndexArray; - - typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback; - typedef std::function<uint64_t()> ClaimDiskReserveCallback; - typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; - typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; - typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback; - - void Initialize(const std::filesystem::path& BlocksBasePath, - uint64_t MaxBlockSize, - uint64_t MaxBlockCount, - const std::vector<BlockStoreLocation>& KnownLocations); - void Close(); - - void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback); - - IoBuffer TryGetChunk(const BlockStoreLocation& Location) const; - void Flush(); - - ReclaimSnapshotState GetReclaimSnapshotState(); - void ReclaimSpace( - const ReclaimSnapshotState& Snapshot, - const std::vector<BlockStoreLocation>& ChunkLocations, - const ChunkIndexArray& KeepChunkIndexes, - uint64_t PayloadAlignment, - bool DryRun, - const ReclaimCallback& ChangeCallback = [](const MovedChunksArray&, const ChunkIndexArray&) {}, - const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; }); - - void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - const IterateChunksSmallSizeCallback& SmallSizeCallback, - const IterateChunksLargeSizeCallback& LargeSizeCallback); - - static const char* GetBlockFileExtension(); - static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); - - inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); } - -private: - std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; - - mutable RwLock m_InsertLock; // used to serialize inserts - Ref<BlockStoreFile> m_WriteBlock; - std::uint64_t m_CurrentInsertOffset = 0; - std::atomic_uint32_t m_WriteBlockIndex{}; - std::vector<uint32_t> m_ActiveWriteBlocks; - - uint64_t m_MaxBlockSize = 1u << 28; - uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1; - std::filesystem::path m_BlocksBasePath; - - std::atomic_uint64_t m_TotalSize{}; -}; - -void blockstore_forcelink(); - -} // namespace zen diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h deleted file mode 100644 index d8c3f22f3..000000000 --- a/zenstore/include/zenstore/caslog.h +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/uid.h> -#include <zenutil/basicfile.h> - -namespace zen { - -class CasLogFile -{ -public: - CasLogFile(); - ~CasLogFile(); - - enum class Mode - { - kRead, - kWrite, - kTruncate - }; - - static bool IsValid(std::filesystem::path FileName, size_t RecordSize); - void Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode); - void Append(const void* DataPointer, uint64_t DataSize); - void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount); - void Flush(); - void Close(); - uint64_t GetLogSize(); - uint64_t GetLogCount(); - -private: - struct FileHeader - { - uint8_t Magic[16]; - uint32_t RecordSize = 0; - Oid LogId; - uint32_t ValidatedTail = 0; - uint32_t Pad[6]; - uint32_t Checksum = 0; - - static const inline uint8_t MagicSequence[16] = {'.', '-', '=', ' ', 'C', 'A', 'S', 'L', 'O', 'G', 'v', '1', ' ', '=', '-', '.'}; - - ZENCORE_API uint32_t ComputeChecksum(); - void Finalize() { Checksum = ComputeChecksum(); } - }; - - static_assert(sizeof(FileHeader) == 64); - -private: - void Open(std::filesystem::path FileName, size_t RecordSize, BasicFile::Mode Mode); - - BasicFile m_File; - FileHeader m_Header; - size_t m_RecordSize = 1; - std::atomic<uint64_t> m_AppendOffset = 0; -}; - -template<typename T> -class TCasLogFile : public CasLogFile -{ -public: - static bool IsValid(std::filesystem::path FileName) { return CasLogFile::IsValid(FileName, sizeof(T)); } - void Open(std::filesystem::path FileName, Mode Mode) { CasLogFile::Open(FileName, sizeof(T), Mode); } - - // This should be called before the Replay() is called to do some basic sanity checking - bool Initialize() { return true; } - - void Replay(Invocable<const T&> auto Handler, uint64_t SkipEntryCount) - { - CasLogFile::Replay( - [&](const void* VoidPtr) { - const T& Record = *reinterpret_cast<const T*>(VoidPtr); - - Handler(Record); - }, - SkipEntryCount); - } - - void Append(const T& Record) - { - // TODO: implement some more efficent path here so we don't end up with - // a syscall per append - - CasLogFile::Append(&Record, sizeof Record); - } - - void Append(const std::span<T>& Records) { CasLogFile::Append(Records.data(), sizeof(T) * Records.size()); } -}; - -} // namespace zen diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h deleted file mode 100644 index 16ca78225..000000000 --- a/zenstore/include/zenstore/cidstore.h +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "zenstore.h" - -#include <zencore/iohash.h> -#include <zenstore/hashkeyset.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#include <filesystem> - -namespace zen { - -class GcManager; -class CasStore; -class CompressedBuffer; -class IoBuffer; -class ScrubContext; - -/** Content Store - * - * Data in the content store is referenced by content identifiers (CIDs), it works - * with compressed buffers so the CID is expected to be the RAW hash. It stores the - * chunk directly under the RAW hash. - * This class maps uncompressed hashes (CIDs) to compressed hashes and may - * be used to deal with other kinds of indirections in the future. For example, if we want - * to support chunking then a CID may represent a list of chunks which could be concatenated - * to form the referenced chunk. - * - */ - -struct CidStoreSize -{ - uint64_t TinySize = 0; - uint64_t SmallSize = 0; - uint64_t LargeSize = 0; - uint64_t TotalSize = 0; -}; - -struct CidStoreConfiguration -{ - // Root directory for CAS store - std::filesystem::path RootDirectory; - - // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy - uint64_t TinyValueThreshold = 1024; - - // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy - uint64_t HugeValueThreshold = 1024 * 1024; -}; - -class CidStore -{ -public: - CidStore(GcManager& Gc); - ~CidStore(); - - struct InsertResult - { - bool New = false; - }; - enum class InsertMode - { - kCopyOnly, - kMayBeMovedInPlace - }; - - void Initialize(const CidStoreConfiguration& Config); - InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace); - IoBuffer FindChunkByCid(const IoHash& DecompressedId); - bool ContainsChunk(const IoHash& DecompressedId); - void FilterChunks(HashKeySet& InOutChunks); - void Flush(); - void Scrub(ScrubContext& Ctx); - CidStoreSize TotalSize() const; - -private: - struct Impl; - std::unique_ptr<CasStore> m_CasStore; - std::unique_ptr<Impl> m_Impl; -}; - -} // namespace zen diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h deleted file mode 100644 index e0354b331..000000000 --- a/zenstore/include/zenstore/gc.h +++ /dev/null @@ -1,242 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/iohash.h> -#include <zencore/thread.h> -#include <zenstore/caslog.h> - -#include <atomic> -#include <chrono> -#include <condition_variable> -#include <filesystem> -#include <functional> -#include <optional> -#include <span> -#include <thread> - -#define ZEN_USE_REF_TRACKING 0 // This is not currently functional - -namespace spdlog { -class logger; -} - -namespace zen { - -class HashKeySet; -class GcManager; -class CidStore; -struct IoHash; - -/** GC clock - */ -class GcClock -{ -public: - using Clock = std::chrono::system_clock; - using TimePoint = Clock::time_point; - using Duration = Clock::duration; - using Tick = int64_t; - - static Tick TickCount() { return Now().time_since_epoch().count(); } - static TimePoint Now() { return Clock::now(); } - static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; } -}; - -/** Garbage Collection context object - */ -class GcContext -{ -public: - GcContext(const GcClock::TimePoint& ExpireTime); - ~GcContext(); - - void AddRetainedCids(std::span<const IoHash> Cid); - void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys); - - void IterateCids(std::function<void(const IoHash&)> Callback); - - void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc); - void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc); - - void AddDeletedCids(std::span<const IoHash> Cas); - const HashKeySet& DeletedCids(); - - std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const; - - bool IsDeletionMode() const; - void SetDeletionMode(bool NewState); - - bool CollectSmallObjects() const; - void CollectSmallObjects(bool NewState); - - GcClock::TimePoint ExpireTime() const; - - void DiskReservePath(const std::filesystem::path& Path); - uint64_t ClaimGCReserve(); - -private: - struct GcState; - - std::unique_ptr<GcState> m_State; -}; - -/** GC root contributor - - Higher level data structures provide roots for the garbage collector, - which ultimately determine what is garbage and what data we need to - retain. - - */ -class GcContributor -{ -public: - GcContributor(GcManager& Gc); - ~GcContributor(); - - virtual void GatherReferences(GcContext& GcCtx) = 0; - -protected: - GcManager& m_Gc; -}; - -struct GcStorageSize -{ - uint64_t DiskSize{}; - uint64_t MemorySize{}; -}; - -/** GC storage provider - */ -class GcStorage -{ -public: - GcStorage(GcManager& Gc); - ~GcStorage(); - - virtual void CollectGarbage(GcContext& GcCtx) = 0; - virtual GcStorageSize StorageSize() const = 0; - -private: - GcManager& m_Gc; -}; - -/** GC orchestrator - */ -class GcManager -{ -public: - GcManager(); - ~GcManager(); - - void AddGcContributor(GcContributor* Contributor); - void RemoveGcContributor(GcContributor* Contributor); - - void AddGcStorage(GcStorage* Contributor); - void RemoveGcStorage(GcStorage* Contributor); - - void CollectGarbage(GcContext& GcCtx); - - GcStorageSize TotalStorageSize() const; - -#if ZEN_USE_REF_TRACKING - void OnNewCidReferences(std::span<IoHash> Hashes); - void OnCommittedCidReferences(std::span<IoHash> Hashes); - void OnDroppedCidReferences(std::span<IoHash> Hashes); -#endif - -private: - spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - mutable RwLock m_Lock; - std::vector<GcContributor*> m_GcContribs; - std::vector<GcStorage*> m_GcStorage; - CidStore* m_CidStore = nullptr; -}; - -enum class GcSchedulerStatus : uint32_t -{ - kIdle, - kRunning, - kStopped -}; - -struct GcSchedulerConfig -{ - std::filesystem::path RootDirectory; - std::chrono::seconds MonitorInterval{30}; - std::chrono::seconds Interval{}; - std::chrono::seconds MaxCacheDuration{86400}; - bool CollectSmallObjects = true; - bool Enabled = true; - uint64_t DiskReserveSize = 1ul << 28; - uint64_t DiskSizeSoftLimit = 0; -}; - -class DiskUsageWindow -{ -public: - struct DiskUsageEntry - { - GcClock::Tick SampleTime; - uint64_t DiskUsage; - }; - - std::vector<DiskUsageEntry> m_LogWindow; - inline void Append(const DiskUsageEntry& Entry) { m_LogWindow.push_back(Entry); } - inline void Append(DiskUsageEntry&& Entry) { m_LogWindow.emplace_back(std::move(Entry)); } - void KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick); - std::vector<uint64_t> GetDiskDeltas(GcClock::Tick StartTick, - GcClock::Tick EndTick, - GcClock::Tick DeltaWidth, - uint64_t& OutMaxDelta) const; - GcClock::Tick FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const; -}; - -/** - * GC scheduler - */ -class GcScheduler -{ -public: - GcScheduler(GcManager& GcManager); - ~GcScheduler(); - - void Initialize(const GcSchedulerConfig& Config); - void Shutdown(); - GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } - - struct TriggerParams - { - bool CollectSmallObjects = false; - std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); - uint64_t DiskSizeSoftLimit = 0; - }; - - bool Trigger(const TriggerParams& Params); - -private: - void SchedulerThread(); - void CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects); - GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime); - spdlog::logger& Log() { return m_Log; } - - spdlog::logger& m_Log; - GcManager& m_GcManager; - GcSchedulerConfig m_Config; - GcClock::TimePoint m_LastGcTime{}; - GcClock::TimePoint m_LastGcExpireTime{}; - GcClock::TimePoint m_NextGcTime{}; - std::atomic_uint32_t m_Status{}; - std::thread m_GcThread; - std::mutex m_GcMutex; - std::condition_variable m_GcSignal; - std::optional<TriggerParams> m_TriggerParams; - - TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog; - DiskUsageWindow m_DiskUsageWindow; -}; - -void gc_forcelink(); - -} // namespace zen diff --git a/zenstore/include/zenstore/hashkeyset.h b/zenstore/include/zenstore/hashkeyset.h deleted file mode 100644 index 411a6256e..000000000 --- a/zenstore/include/zenstore/hashkeyset.h +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "zenstore.h" - -#include <zencore/iohash.h> - -#include <functional> -#include <unordered_set> - -namespace zen { - -/** Manage a set of IoHash values - */ - -class HashKeySet -{ -public: - void AddHashToSet(const IoHash& HashToAdd); - void AddHashesToSet(std::span<const IoHash> HashesToAdd); - void RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); - void IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const; - [[nodiscard]] inline bool ContainsHash(const IoHash& Hash) const { return m_HashSet.find(Hash) != m_HashSet.end(); } - [[nodiscard]] inline bool IsEmpty() const { return m_HashSet.empty(); } - [[nodiscard]] inline size_t GetSize() const { return m_HashSet.size(); } - - inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) const - { - for (const IoHash& Candidate : Candidates) - { - if (ContainsHash(Candidate)) - { - MatchFunc(Candidate); - } - } - } - - inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) const - { - for (const IoHash& Candidate : Candidates) - { - MatchFunc(Candidate, ContainsHash(Candidate)); - } - } - -private: - // Q: should we protect this with a lock, or is that a higher level concern? - std::unordered_set<IoHash, IoHash::Hasher> m_HashSet; -}; - -void hashkeyset_forcelink(); - -} // namespace zen diff --git a/zenstore/include/zenstore/scrubcontext.h b/zenstore/include/zenstore/scrubcontext.h deleted file mode 100644 index 0b884fcc6..000000000 --- a/zenstore/include/zenstore/scrubcontext.h +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/timer.h> -#include <zenstore/hashkeyset.h> - -namespace zen { - -/** Context object for data scrubbing - * - * Data scrubbing is when we traverse stored data to validate it and - * optionally correct/recover - */ - -class ScrubContext -{ -public: - virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); } - inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } - inline bool RunRecovery() const { return m_Recover; } - void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) - { - m_ChunkCount.fetch_add(ChunkCount); - m_ByteCount.fetch_add(ChunkBytes); - } - - inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } - inline uint64_t ScrubbedBytes() const { return m_ByteCount; } - - const HashKeySet BadCids() const { return m_BadCid; } - -private: - uint64_t m_ScrubTime = GetHifreqTimerValue(); - bool m_Recover = true; - std::atomic<uint64_t> m_ChunkCount{0}; - std::atomic<uint64_t> m_ByteCount{0}; - HashKeySet m_BadCid; -}; - -} // namespace zen diff --git a/zenstore/include/zenstore/zenstore.h b/zenstore/include/zenstore/zenstore.h deleted file mode 100644 index 46d62029d..000000000 --- a/zenstore/include/zenstore/zenstore.h +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/zencore.h> - -#define ZENSTORE_API - -namespace zen { - -ZENSTORE_API void zenstore_forcelinktests(); - -} diff --git a/zenstore/xmake.lua b/zenstore/xmake.lua deleted file mode 100644 index 4469c5650..000000000 --- a/zenstore/xmake.lua +++ /dev/null @@ -1,9 +0,0 @@ --- Copyright Epic Games, Inc. All Rights Reserved. - -target('zenstore') - set_kind("static") - add_headerfiles("**.h") - add_files("**.cpp") - add_includedirs("include", {public=true}) - add_deps("zencore", "zenutil") - add_packages("vcpkg::robin-map") diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp deleted file mode 100644 index d87652fde..000000000 --- a/zenstore/zenstore.cpp +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "zenstore/zenstore.h" - -#if ZEN_WITH_TESTS - -# include <zenstore/blockstore.h> -# include <zenstore/gc.h> -# include <zenstore/hashkeyset.h> -# include <zenutil/basicfile.h> - -# include "cas.h" -# include "compactcas.h" -# include "filecas.h" - -namespace zen { - -void -zenstore_forcelinktests() -{ - basicfile_forcelink(); - CAS_forcelink(); - filecas_forcelink(); - blockstore_forcelink(); - compactcas_forcelink(); - gc_forcelink(); - hashkeyset_forcelink(); -} - -} // namespace zen - -#endif |