diff options
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 2443 |
1 files changed, 2172 insertions, 271 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 3bf0c70df..51fe7a901 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -1,28 +1,24 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenstore/cas.h> - #include "compactcas.h" -#include <zencore/compactbinarybuilder.h> +#include <zenstore/cas.h> + #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> -#include <zencore/memory.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> -#include <zencore/thread.h> -#include <zencore/uid.h> - -#include <zenstore/gc.h> - -#include <filesystem> -#include <functional> +#include <zencore/scopeguard.h> #include <gsl/gsl-lite.hpp> +#include <xxhash.h> + #if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <zenstore/cidstore.h> # include <algorithm> # include <random> #endif @@ -31,6 +27,211 @@ namespace zen { +struct CasDiskIndexHeader +{ + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } +}; + +static_assert(sizeof(CasDiskIndexHeader) == 32); + +namespace { + std::vector<CasDiskIndexEntry> MakeCasDiskEntries(const std::unordered_map<IoHash, BlockStoreDiskLocation>& MovedChunks, + const std::vector<IoHash>& DeletedChunks) + { + std::vector<CasDiskIndexEntry> result; + result.reserve(MovedChunks.size()); + for (const auto& MovedEntry : MovedChunks) + { + result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second}); + } + for (const IoHash& ChunkHash : DeletedChunks) + { + result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone}); + } + return result; + } + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".ulog"; + const char* DataExtension = ".ucas"; + + std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / ContainerBaseName; + } + + std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); + } + + std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / "blocks"; + } + + std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) + { + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(DataExtension); + return Path.ToPath(); + } + + std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / (ContainerBaseName + LogExtension); + } + + std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / (ContainerBaseName + DataExtension); + } + + std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / (ContainerBaseName + IndexExtension); + } + + struct LegacyCasDiskLocation + { + LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize) + { + ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); + ZEN_ASSERT(InSize <= 0xff'ffff'ffff); + + memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); + memcpy(&m_Size[0], &InSize, sizeof m_Size); + } + + LegacyCasDiskLocation() = default; + + inline uint64_t GetOffset() const + { + uint64_t Offset = 0; + memcpy(&Offset, &m_Offset, sizeof m_Offset); + return Offset; + } + + inline uint64_t GetSize() const + { + uint64_t Size = 0; + memcpy(&Size, &m_Size, sizeof m_Size); + return Size; + } + + private: + uint8_t m_Offset[5]; + uint8_t m_Size[5]; + }; + + struct LegacyCasDiskIndexEntry + { + static const uint8_t kTombstone = 0x01; + + IoHash Key; + LegacyCasDiskLocation Location; + ZenContentType ContentType = ZenContentType::kUnknownContentType; + uint8_t Flags = 0; + }; + + bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if ((Entry.Flags & ~LegacyCasDiskIndexEntry::kTombstone) != 0) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & LegacyCasDiskIndexEntry::kTombstone) + { + return true; + } + if (Entry.ContentType != ZenContentType::kUnknownContentType) + { + OutReason = + fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.GetSize(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & CasDiskIndexEntry::kTombstone) + { + return true; + } + if (Entry.ContentType != ZenContentType::kUnknownContentType) + { + OutReason = + fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.GetSize(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) : GcStorage(Gc) , m_Config(Config) @@ -43,13 +244,16 @@ CasContainerStrategy::~CasContainerStrategy() } void -CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) +CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); + ZEN_ASSERT(MaxBlockSize > 0); m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; + m_MaxBlockSize = MaxBlockSize; + m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); @@ -59,36 +263,79 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { + uint32_t WriteBlockIndex; + Ref<BlockStoreFile> WriteBlock; + uint64_t InsertOffset; { - RwLock::SharedLockScope _(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); + RwLock::ExclusiveLockScope _(m_InsertLock); - if (KeyIt != m_LocationMap.end()) { - return CasStore::InsertResult{.New = false}; + RwLock::SharedLockScope __(m_LocationMapLock); + if (m_LocationMap.contains(ChunkHash)) + { + return CasStore::InsertResult{.New = false}; + } } - } - - // New entry - - RwLock::ExclusiveLockScope _(m_InsertLock); - const uint64_t InsertOffset = m_CurrentInsertOffset; - m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset); + // New entry - m_CurrentInsertOffset = (m_CurrentInsertOffset + ChunkSize + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); - - RwLock::ExclusiveLockScope __(m_LocationMapLock); - - const CasDiskLocation Location{InsertOffset, ChunkSize}; - - m_LocationMap[ChunkHash] = Location; - - CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; + WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = m_WriteBlock != nullptr; + if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) + { + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + { + throw std::runtime_error( + fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName)); + } + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + m_WriteBlock = new BlockStoreFile(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + } + m_CurrentInsertOffset = 0; + m_WriteBlock->Create(m_MaxBlockSize); + } + InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); + WriteBlock = m_WriteBlock; + } - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize)); + // We can end up in a situation that InsertChunk writes the same chunk data in + // different locations. + // We release the insert lock once we have the correct WriteBlock ready and we know + // where to write the data. If a new InsertChunk request for the same chunk hash/data + // comes in before we update m_LocationMap below we will have a race. + // The outcome of that is that we will write the chunk data in more than one location + // but the chunk hash will only point to one of the chunks. + // We will in that case waste space until the next GC operation. + // + // This should be a rare occasion and the current flow reduces the time we block for + // reads, insert and GC. + + BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; + + WriteBlock->Write(ChunkData, ChunkSize, InsertOffset); m_CasLog.Append(IndexEntry); + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_release); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, Location); + } + return CasStore::InsertResult{.New = true}; } @@ -101,31 +348,28 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { - RwLock::SharedLockScope _(m_LocationMapLock); - - if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) + Ref<BlockStoreFile> ChunkBlock; + BlockStoreLocation Location; { - const CasDiskLocation& Location = KeyIt->second; - - return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); + RwLock::SharedLockScope _(m_LocationMapLock); + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) + { + Location = KeyIt->second.Get(m_PayloadAlignment); + ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; + } + else + { + return IoBuffer(); + } } - - // Not found - - return IoBuffer(); + return ChunkBlock->GetChunk(Location.Offset, Location.Size); } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - - if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) - { - return true; - } - - return false; + return m_LocationMap.contains(ChunkHash); } void @@ -144,20 +388,23 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) void CasContainerStrategy::Flush() { - m_CasLog.Flush(); - m_SmallObjectIndex.Flush(); - m_SmallObjectFile.Flush(); + { + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } + } + MakeIndexSnapshot(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { - const uint64_t WindowSize = 4 * 1024 * 1024; - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; - const uint64_t FileSize = m_SmallObjectFile.FileSize(); - - std::vector<CasDiskIndexEntry> BigChunks; std::vector<CasDiskIndexEntry> BadChunks; // We do a read sweep through the payloads file and validate @@ -166,62 +413,73 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) // pass. An alternative strategy would be to use memory mapping. { - IoBuffer ReadBuffer{WindowSize}; - void* BufferBase = ReadBuffer.MutableData(); + std::vector<CasDiskIndexEntry> BigChunks; + const uint64_t WindowSize = 4 * 1024 * 1024; + IoBuffer ReadBuffer{WindowSize}; + void* BufferBase = ReadBuffer.MutableData(); - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? + RwLock::SharedLockScope __(m_LocationMapLock); - do + for (const auto& Block : m_ChunkBlocks) { - const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); - m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; + const Ref<BlockStoreFile>& BlockFile = Block.second; + BlockFile->Open(); + const uint64_t FileSize = BlockFile->FileSize(); - for (auto& Entry : m_LocationMap) + do { - const uint64_t EntryOffset = Entry.second.GetOffset(); + const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); + BlockFile->Read(BufferBase, ChunkSize, WindowStart); - if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + for (auto& Entry : m_LocationMap) { - const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); + const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment); + const uint64_t EntryOffset = Location.Offset; - if (EntryEnd >= WindowEnd) + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { - BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + const uint64_t EntryEnd = EntryOffset + Location.Size; - continue; - } + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); - const IoHash ComputedHash = - IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart, - Entry.second.GetSize()); + continue; + } - if (Entry.first != ComputedHash) - { - // Hash mismatch + const IoHash ComputedHash = + IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, Location.Size); - BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + if (Entry.first != ComputedHash) + { + // Hash mismatch + BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone}); + } } } - } - WindowStart += WindowSize; - WindowEnd += WindowSize; - } while (WindowStart < FileSize); - } - - // Deal with large chunks + WindowStart += WindowSize; + WindowEnd += WindowSize; + } while (WindowStart < FileSize); + } - for (const CasDiskIndexEntry& Entry : BigChunks) - { - IoHashStream Hasher; - m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { - Hasher.Append(Data, Size); - }); - IoHash ComputedHash = Hasher.GetHash(); + // Deal with large chunks - if (Entry.Key != ComputedHash) + for (const CasDiskIndexEntry& Entry : BigChunks) { - BadChunks.push_back(Entry); + IoHashStream Hasher; + const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment); + const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex]; + BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + + if (Entry.Key != ComputedHash) + { + BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); + } } } @@ -230,17 +488,21 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) return; } - ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); + ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_Config.RootDirectory / m_ContainerBaseName); // Deal with bad chunks by removing them from our lookup map std::vector<IoHash> BadChunkHashes; + BadChunkHashes.reserve(BadChunks.size()); - for (const CasDiskIndexEntry& Entry : BadChunks) + m_CasLog.Append(BadChunks); { - BadChunkHashes.push_back(Entry.Key); - m_CasLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); - m_LocationMap.erase(Entry.Key); + RwLock::ExclusiveLockScope _(m_LocationMapLock); + for (const CasDiskIndexEntry& Entry : BadChunks) + { + BadChunkHashes.push_back(Entry.Key); + m_LocationMap.erase(Entry.Key); + } } // Let whomever it concerns know about the bad chunks. This could @@ -253,243 +515,1106 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) void CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { - namespace fs = std::filesystem; - - // A naive garbage collection implementation that just copies evicted chunks - // into a new container file. We probably need to partition the container file - // into several parts to prevent needing to keep the entire container file during GC. + // It collects all the blocks that we want to delete chunks from. For each such + // block we keep a list of chunks to retain and a list of chunks to delete. + // + // If there is a block that we are currently writing to, that block is omitted + // from the garbage collection. + // + // Next it will iterate over all blocks that we want to remove chunks from. + // If the block is empty after removal of chunks we mark the block as pending + // delete - we want to delete it as soon as there are no IoBuffers using the + // block file. + // Once complete we update the m_LocationMap by removing the chunks. + // + // If the block is non-empty we write out the chunks we want to keep to a new + // block file (creating new block files as needed). + // + // We update the index as we complete each new block file. This makes it possible + // to break the GC if we want to limit time for execution. + // + // GC can fairly parallell to regular operation - it will block while taking + // a snapshot of the current m_LocationMap state. + // + // While moving blocks it will do a blocking operation and update the m_LocationMap + // after each new block is written and figuring out the path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + std::vector<IoHash> DeletedChunks; + uint64_t MovedCount = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([this, + &TotalTimer, + &WriteBlockTimeUs, + &WriteBlockLongestTimeUs, + &ReadBlockTimeUs, + &ReadBlockLongestTimeUs, + &TotalChunkCount, + &DeletedChunks, + &MovedCount, + &DeletedSize, + OldTotalSize] { + ZEN_INFO( + "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "chunks ({}).", + m_Config.RootDirectory / m_ContainerBaseName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedChunks.size(), + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + }); - RwLock::ExclusiveLockScope _(m_LocationMapLock); + LocationMap_t LocationMap; + size_t BlockCount; + uint64_t ExcludeBlockIndex = 0x800000000ull; + { + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_LocationMapLock); + { + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_WriteBlock) + { + ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + } + __.ReleaseNow(); + } + LocationMap = m_LocationMap; + BlockCount = m_ChunkBlocks.size(); + } + + if (LocationMap.empty()) + { + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName); + return; + } - Flush(); + TotalChunkCount = LocationMap.size(); - std::vector<IoHash> Candidates; - std::vector<IoHash> ChunksToKeep; - std::vector<IoHash> ChunksToDelete; - const uint64_t ChunkCount = m_LocationMap.size(); - uint64_t TotalSize{}; + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<std::vector<IoHash>> KeepChunks; + std::vector<std::vector<IoHash>> DeleteChunks; - Candidates.reserve(m_LocationMap.size()); + BlockIndexToChunkMapIndex.reserve(BlockCount); + KeepChunks.reserve(BlockCount); + DeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; - for (auto& Entry : m_LocationMap) + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : LocationMap) { - Candidates.push_back(Entry.first); - TotalSize += Entry.second.GetSize(); + TotalChunkHashes.push_back(Entry.first); } - ChunksToKeep.reserve(Candidates.size()); - GcCtx.FilterCas(Candidates, [&ChunksToKeep, &ChunksToDelete](const IoHash& Hash, bool Keep) { + uint64_t DeleteCount = 0; + + uint64_t NewTotalSize = 0; + GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreDiskLocation& Location = KeyIt->second; + uint32_t BlockIndex = Location.GetBlockIndex(); + + if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex) + { + return; + } + + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = KeepChunks.size(); + BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; + KeepChunks.resize(ChunkMapIndex + 1); + KeepChunks.back().reserve(GuesstimateCountPerBlock); + DeleteChunks.resize(ChunkMapIndex + 1); + DeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } if (Keep) { - ChunksToKeep.push_back(Hash); + std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex]; + ChunkMap.push_back(ChunkHash); + NewTotalSize += Location.GetSize(); } else { - ChunksToDelete.push_back(Hash); + std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; + ChunkMap.push_back(ChunkHash); + DeleteCount++; } }); - if (m_LocationMap.empty() || ChunksToKeep.size() == m_LocationMap.size()) + std::unordered_set<uint32_t> BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) { - ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete", - ChunkCount, - NiceBytes(TotalSize), - m_Config.RootDirectory / m_ContainerBaseName); + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) + { + continue; + } + BlocksToReWrite.insert(BlockIndex); + } + + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); + ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_Config.RootDirectory / m_ContainerBaseName, + DeleteCount, + NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); return; } - const uint64_t NewChunkCount = ChunksToKeep.size(); - uint64_t NewTotalSize = 0; + // Move all chunks in blocks that have chunks removed to new blocks + + Ref<BlockStoreFile> NewBlockFile; + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + DeletedChunks.reserve(DeleteCount); - for (const IoHash& Key : ChunksToKeep) + auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) { + for (const CasDiskIndexEntry& Entry : Entries) + { + if (Entry.Flags & CasDiskIndexEntry::kTombstone) + { + auto KeyIt = m_LocationMap.find(Entry.Key); + uint64_t ChunkSize = KeyIt->second.GetSize(); + m_TotalSize.fetch_sub(ChunkSize); + m_LocationMap.erase(KeyIt); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } + }; + + std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks; + for (uint32_t BlockIndex : BlocksToReWrite) { - const CasDiskLocation& Loc = m_LocationMap[Key]; - NewTotalSize += Loc.GetSize(); + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; + + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _i(m_LocationMapLock); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + } + + const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap); + m_CasLog.Append(LogEntries); + m_CasLog.Flush(); + { + RwLock::ExclusiveLockScope _i(m_LocationMapLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + m_ChunkBlocks[BlockIndex] = nullptr; + } + DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); + ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", + m_ContainerBaseName, + BlockIndex, + OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + continue; + } + + std::vector<uint8_t> Chunk; + for (const IoHash& ChunkHash : KeepMap) + { + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment); + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {}); + m_CasLog.Append(LogEntries); + m_CasLog.Flush(); + + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + } + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_Config.RootDirectory / m_ContainerBaseName, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + + MovedCount += MovedBlockChunks.size(); + MovedBlockChunks.clear(); + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); + return; + } + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_Config.RootDirectory / m_ContainerBaseName, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_Config.RootDirectory / m_ContainerBaseName, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; + } + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedBlockChunks.emplace( + ChunkHash, + BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment)); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); + } + Chunk.clear(); + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = {}; + } + + const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap); + m_CasLog.Append(LogEntries); + m_CasLog.Flush(); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + m_ChunkBlocks[BlockIndex] = nullptr; + } + MovedCount += MovedBlockChunks.size(); + DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); + MovedBlockChunks.clear(); + + ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + OldBlockFile = nullptr; } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) + for (const IoHash& ChunkHash : DeletedChunks) { - ZEN_ERROR("get disk space FAILED, reason '{}'", Error.message()); - return; + DeletedSize += LocationMap[ChunkHash].GetSize(); } - if (Space.Free < NewTotalSize + (64 << 20)) - { - ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", + GcCtx.DeletedCas(DeletedChunks); +} + +void +CasContainerStrategy::MakeIndexSnapshot() +{ + ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([this, &EntryCount, &Timer] { + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(NewTotalSize), - NiceBytes(Space.Free)); - return; - } + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + fs::path TempIndexPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - if (!CollectSmallObjects) + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(TempIndexPath)) { - ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", - m_Config.RootDirectory / m_ContainerBaseName, - ChunkCount - NewChunkCount, - NiceBytes(TotalSize - NewTotalSize), - ChunkCount, - NiceBytes(TotalSize)); - return; + fs::remove(TempIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, TempIndexPath); } - fs::path TmpSobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ucas"); - fs::path TmpSlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ulog"); - + try { - ZEN_DEBUG("creating temporary container cas '{}'...", TmpSobsPath); + m_CasLog.Flush(); + + // Write the current state of the location map to a new index state + uint64_t LogCount = 0; + std::vector<CasDiskIndexEntry> Entries; - TCasLogFile<CasDiskIndexEntry> TmpLog; - BasicFile TmpObjectFile; - bool IsNew = true; + { + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_LocationMapLock); + Entries.resize(m_LocationMap.size()); - TmpLog.Open(TmpSlogPath, IsNew); - TmpObjectFile.Open(TmpSobsPath, IsNew); + uint64_t EntryIndex = 0; + for (auto& Entry : m_LocationMap) + { + CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second; + } - std::vector<uint8_t> Chunk; - uint64_t NextInsertOffset{}; + LogCount = m_CasLog.GetLogCount(); + } - for (const IoHash& Key : ChunksToKeep) - { - const auto Entry = m_LocationMap.find(Key); - const auto& Loc = Entry->second; + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CasDiskIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); - Chunk.resize(Loc.GetSize()); - m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), Loc.GetOffset()); + ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - const uint64_t InsertOffset = NextInsertOffset; - TmpObjectFile.Write(Chunk.data(), Chunk.size(), InsertOffset); - TmpLog.Append({.Key = Key, .Location = {InsertOffset, Chunk.size()}}); + // Restore any previous snapshot - NextInsertOffset = (NextInsertOffset + Chunk.size() + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(IndexPath); + fs::rename(TempIndexPath, IndexPath); } } + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(TempIndexPath); + } +} - try +uint64_t +CasContainerStrategy::ReadIndexFile() +{ + std::vector<CasDiskIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(IndexPath)) { - CloseContainer(); + Stopwatch Timer; + const auto _ = MakeGuard([this, &Entries, &Timer] { + ZEN_INFO("read store '{}' index containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - fs::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); - fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); - fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CasDiskIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); + CasDiskIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && + (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) + { + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; - fs::remove(SobsPath); - fs::remove(SidxPath); - fs::remove(SlogPath); + std::string InvalidEntryReason; + for (const CasDiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } - fs::rename(TmpSobsPath, SobsPath); - fs::rename(TmpSlogPath, SlogPath); + return Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + } + } + return 0; +} + +uint64_t +CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) +{ + std::vector<CasDiskIndexEntry> Entries; + std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(LogPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([this, &Entries, &Timer] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) { - // Create a new empty index file - BasicFile SidxFile; - SidxFile.Open(SidxPath, true); + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + uint64_t ReadCount = EntryCount - SkipEntryCount; + Entries.reserve(ReadCount); + CasLog.Replay( + [&](const CasDiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & CasDiskIndexEntry::kTombstone) + { + m_LocationMap.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + return; + } + m_LocationMap[Record.Key] = Record.Location; + }, + SkipEntryCount); + return ReadCount; } + } + return 0; +} - OpenContainer(false /* IsNewStore */); +uint64_t +CasContainerStrategy::MigrateLegacyData(bool CleanSource) +{ + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); - GcCtx.DeletedCas(ChunksToDelete); + if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) + { + return 0; + } - ZEN_INFO("garbage collect from '{}' DONE, collected #{} {} chunks of total #{} {}", + ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName); + + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LegacyIndexPath = GetLegacyIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + + uint64_t MigratedChunkCount = 0; + uint32_t MigratedBlockCount = 0; + Stopwatch MigrationTimer; + uint64_t TotalSize = 0; + const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { + ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", m_Config.RootDirectory / m_ContainerBaseName, - ChunkCount - NewChunkCount, - NiceBytes(TotalSize - NewTotalSize), - ChunkCount, + MigratedChunkCount, + MigratedBlockCount, + NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), NiceBytes(TotalSize)); + }); + + uint32_t WriteBlockIndex = 0; + while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; } - catch (std::exception& Err) + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); + if (Error) { - ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); + ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); + return 0; + } + + if (Space.Free < m_MaxBlockSize) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", + m_Config.RootDirectory / m_ContainerBaseName, + m_MaxBlockSize, + NiceBytes(Space.Free)); + return 0; + } + + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - // Something went wrong, try create a new container - OpenContainer(true /* IsNewStore */); + std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; + uint64_t InvalidEntryCount = 0; - GcCtx.DeletedCas(ChunksToDelete); - GcCtx.DeletedCas(ChunksToKeep); + TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; + LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); + { + Stopwatch Timer; + const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] { + ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + LegacyDiskIndex.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + if (LegacyCasLog.Initialize()) + { + LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); + LegacyCasLog.Replay( + [&](const LegacyCasDiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + InvalidEntryCount++; + return; + } + LegacyDiskIndex.insert_or_assign(Record.Key, Record); + }, + 0); + + std::vector<IoHash> BadEntries; + uint64_t BlockFileSize = BlockFile.FileSize(); + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyCasDiskIndexEntry& Record(Entry.second); + if (Record.Location.GetOffset() + Record.Location.GetSize() <= BlockFileSize) + { + continue; + } + ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); + BadEntries.push_back(Entry.first); + } + for (const IoHash& BadHash : BadEntries) + { + LegacyDiskIndex.erase(BadHash); + } + InvalidEntryCount += BadEntries.size(); + } } -} -void -CasContainerStrategy::MakeSnapshot() -{ - RwLock::SharedLockScope _(m_LocationMapLock); + if (InvalidEntryCount) + { + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_Config.RootDirectory / m_ContainerBaseName); + } - std::vector<CasDiskIndexEntry> Entries{m_LocationMap.size()}; + if (LegacyDiskIndex.empty()) + { + BlockFile.Close(); + LegacyCasLog.Close(); + if (CleanSource) + { + // Older versions of CasContainerStrategy expects the legacy files to exist if it can find + // a CAS manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + BasicFile LegacySidx; + LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate); + } + return 0; + } - uint64_t EntryIndex = 0; - for (auto& Entry : m_LocationMap) + for (const auto& Entry : LegacyDiskIndex) { - CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = Entry.second; + const LegacyCasDiskIndexEntry& Record(Entry.second); + TotalSize += Record.Location.GetSize(); } - m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); -} + uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size()); + uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize; + if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", + m_Config.RootDirectory / m_ContainerBaseName, + MaxRequiredBlockCount, + BlockStoreDiskLocation::MaxBlockIndex); + return 0; + } -void -CasContainerStrategy::OpenContainer(bool IsNewStore) -{ - std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); - std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); - std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); + constexpr const uint64_t DiskReserve = 1ul << 28; - m_SmallObjectFile.Open(SobsPath, IsNewStore); - m_SmallObjectIndex.Open(SidxPath, IsNewStore); - m_CasLog.Open(SlogPath, IsNewStore); + if (CleanSource) + { + if (Space.Free < (m_MaxBlockSize + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(m_MaxBlockSize + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } + else + { + if (Space.Free < (RequiredDiskSpace + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(RequiredDiskSpace + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } - // TODO: should validate integrity of container files here + std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + CreateDirectories(LogPath.parent_path()); + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - m_CurrentInsertOffset = 0; - m_CurrentIndexOffset = 0; - m_TotalSize = 0; + if (CleanSource && (MaxRequiredBlockCount < 2)) + { + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(LegacyDiskIndex.size()); - m_LocationMap.clear(); + // We can use the block as is, just move it and add the blocks to our new log + for (auto& Entry : LegacyDiskIndex) + { + const LegacyCasDiskIndexEntry& Record(Entry.second); - uint64_t MaxFileOffset = 0; + BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()); + BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment); + LogEntries.push_back( + {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + CreateDirectories(BlockPath.parent_path()); + BlockFile.Close(); + std::filesystem::rename(LegacyDataPath, BlockPath); + CasLog.Append(LogEntries); + for (const CasDiskIndexEntry& Entry : LogEntries) + { + m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); + } - m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { - if (Record.Flags & CasDiskIndexEntry::kTombstone) + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + } + else + { + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) { - m_TotalSize.fetch_sub(Record.Location.GetSize()); + ChunkHashes.push_back(Entry.first); } - else + + std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { + auto LhsKeyIt = LegacyDiskIndex.find(Lhs); + auto RhsKeyIt = LegacyDiskIndex.find(Rhs); + return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); + }); + + uint64_t BlockSize = 0; + uint64_t BlockOffset = 0; + std::vector<BlockStoreLocation> NewLocations; + struct BlockData { - m_TotalSize.fetch_add(Record.Location.GetSize()); - m_LocationMap[Record.Key] = Record.Location; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize()); + std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; + uint64_t BlockOffset; + uint64_t BlockSize; + uint32_t BlockIndex; + }; + + std::vector<BlockData> BlockRanges; + std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; + BlockRanges.reserve(MaxRequiredBlockCount); + for (const IoHash& ChunkHash : ChunkHashes) + { + const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; + const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; + + uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); + uint64_t ChunkSize = LegacyChunkLocation.GetSize(); + uint64_t ChunkEnd = ChunkOffset + ChunkSize; + + if (BlockSize == 0) + { + BlockOffset = ChunkOffset; + } + if ((ChunkEnd - BlockOffset) > m_MaxBlockSize) + { + BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; + BlockRange.Chunks.swap(Chunks); + BlockRanges.push_back(BlockRange); + + WriteBlockIndex++; + while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + BlockOffset = ChunkOffset; + BlockSize = 0; + } + BlockSize = RoundUp(BlockSize, m_PayloadAlignment); + BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; + Chunks.push_back({ChunkHash, ChunkLocation}); + BlockSize = ChunkEnd - BlockOffset; } - }); + if (BlockSize > 0) + { + BlockRanges.push_back( + {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); + } + Stopwatch WriteBlockTimer; + + std::reverse(BlockRanges.begin(), BlockRanges.end()); + std::vector<std::uint8_t> Buffer(1 << 28); + for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) + { + const BlockData& BlockRange = BlockRanges[Idx]; + if (Idx > 0) + { + uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; + uint64_t Completed = BlockOffset + BlockSize - Remaining; + uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; + + ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", + m_Config.RootDirectory / m_ContainerBaseName, + Idx, + BlockRanges.size(), + NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), + NiceBytes(BlockOffset + BlockSize), + NiceTimeSpanMs(ETA)); + } + + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); + ChunkBlock.Create(BlockRange.BlockSize); + uint64_t Offset = 0; + while (Offset < BlockRange.BlockSize) + { + uint64_t Size = BlockRange.BlockSize - Offset; + if (Size > Buffer.size()) + { + Size = Buffer.size(); + } + BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); + ChunkBlock.Write(Buffer.data(), Size, Offset); + Offset += Size; + } + ChunkBlock.Truncate(Offset); + ChunkBlock.Flush(); + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; + BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment); + LogEntries.push_back( + {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); + } + CasLog.Append(LogEntries); + for (const CasDiskIndexEntry& Entry : LogEntries) + { + m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); + } + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + + if (CleanSource) + { + std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; + LegacyLogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); + } + LegacyCasLog.Append(LegacyLogEntries); + BlockFile.SetFileSize(BlockRange.BlockOffset); + } + } + } - m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); - m_CurrentIndexOffset = m_SmallObjectIndex.FileSize(); + BlockFile.Close(); + LegacyCasLog.Close(); + CasLog.Close(); + + if (CleanSource) + { + // Older versions of CasContainerStrategy expects the legacy files to exist if it can find + // a CAS manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + BasicFile LegacySidx; + LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate); + } + return MigratedChunkCount; } void -CasContainerStrategy::CloseContainer() +CasContainerStrategy::OpenContainer(bool IsNewStore) { - m_SmallObjectFile.Close(); - m_SmallObjectIndex.Close(); - m_CasLog.Close(); + // Add .running file and delete on clean on close to detect bad termination + m_TotalSize = 0; + + m_LocationMap.clear(); + + std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName); + + if (IsNewStore) + { + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); + + std::filesystem::remove(LegacyLogPath); + std::filesystem::remove(LegacyDataPath); + std::filesystem::remove_all(BasePath); + } + + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); + uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + + CreateDirectories(BasePath); + + std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : m_LocationMap) + { + const BlockStoreDiskLocation& Location = Entry.second; + m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_release); + KnownBlocks.insert(Location.GetBlockIndex()); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector<std::filesystem::path> FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != DataExtension) + { + continue; + } + std::string FileName = Path.stem().string(); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); + BlockFile->Open(); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } + + if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0)) + { + MakeIndexSnapshot(); + } + + // TODO: should validate integrity of container files here } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS -TEST_CASE("cas.compact.gc") +namespace { + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace + +TEST_CASE("compactcas.hex") +{ + uint32_t Value; + std::string HexString; + CHECK(!ParseHexNumber("", Value)); + char Hex[9]; + + ToHexNumber(0u, Hex); + HexString = std::string(Hex); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0u); + + ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex); + HexString = std::string(Hex); + CHECK(HexString == "ffffffff"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == std::numeric_limits<std::uint32_t>::max()); + + ToHexNumber(0xadf14711u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "adf14711"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0xadf14711u); + + ToHexNumber(0x80000000u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "80000000"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0x80000000u); + + ToHexNumber(0x718293a4u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "718293a4"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0x718293a4u); +} + +TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); const int kIterationCount = 1000; @@ -499,7 +1624,7 @@ TEST_CASE("cas.compact.gc") { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, true); + Cas.Initialize("test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { @@ -533,7 +1658,7 @@ TEST_CASE("cas.compact.gc") { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, false); + Cas.Initialize("test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { @@ -545,67 +1670,843 @@ TEST_CASE("cas.compact.gc") CHECK_EQ(Value["id"].AsInt32(), i); } - - GcContext Ctx; - Cas.CollectGarbage(Ctx); } } -TEST_CASE("cas.compact.totalsize") +TEST_CASE("compactcas.compact.totalsize") { std::random_device rd; std::mt19937 g(rd()); - const auto CreateChunk = [&](uint64_t Size) -> IoBuffer { - const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t)); - std::vector<uint32_t> Values; - Values.resize(Count); - for (size_t Idx = 0; Idx < Count; ++Idx) + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const uint64_t kChunkSize = 1024; + const int32_t kChunkCount = 16; + { - Values[Idx] = static_cast<uint32_t>(Idx); + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 65536, 16, true); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + } + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } - std::shuffle(Values.begin(), Values.end(), g); - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t)); - }; + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 65536, 16, false); + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + + // Re-open again, this time we should have a snapshot + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 65536, 16, false); + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + } +} + +TEST_CASE("compactcas.gc.basic") +{ ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("cb", 65536, 1 << 4, true); + + IoBuffer Chunk = CreateChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(InsertResult.New); + Cas.Flush(); + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("compactcas.gc.removefile") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1024; - const int32_t kChunkCount = 16; + IoBuffer Chunk = CreateChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("cb", 65536, 1 << 4, true); + + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(InsertResult.New); + const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(!InsertResultDup.New); + Cas.Flush(); + } + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("cb", 65536, 1 << 4, false); + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("compactcas.gc.compact") +{ + // for (uint32_t i = 0; i < 100; ++i) { + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, true); + Cas.Initialize("cb", 2048, 1 << 4, true); - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(9); + for (uint64_t Size : ChunkSizes) { - IoBuffer Chunk = CreateChunk(kChunkSize); - const IoHash Hash = HashBuffer(Chunk); - auto InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); + Chunks.push_back(CreateChunk(Size)); } - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(9); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); + CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); + CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); + CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); + CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); + CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); + CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); + CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); + CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + uint64_t InitialSize = Cas.StorageSize().DiskSize; + + // Keep first and last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + } + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + + // Keep last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep mixed + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[1]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[7]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(!Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[8], ChunkHashes[8]); + } + + // Keep multiple at end + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[7]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + } + + // Keep every other + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[2]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Verify that we nicely appended blocks even after all GC operations + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + uint64_t FinalSize = Cas.StorageSize().DiskSize; + CHECK(InitialSize == FinalSize); } +} + +TEST_CASE("compactcas.gc.deleteblockonopen") +{ + ScopedTemporaryDirectory TempDir; + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (uint64_t Size : ChunkSizes) { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + // GC every other block + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < 20; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } + } + { + // Re-open + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, false); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } +} + +TEST_CASE("compactcas.gc.handleopeniobuffer") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (const uint64_t& Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); + Cas.Flush(); + + // GC everything + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i++) + { + CHECK(!Cas.HaveChunk(ChunkHashes[i])); + } + + CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); +} + +TEST_CASE("compactcas.legacyconversion") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048}; + size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); + size_t SingleBlockSize = 0; + std::vector<IoBuffer> Chunks; + Chunks.reserve(ChunkCount); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + SingleBlockSize += Size; + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunkCount); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", gsl::narrow<uint32_t>(SingleBlockSize * 2), 16, true); + + for (size_t i = 0; i < ChunkCount; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < ChunkCount; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); + Gc.CollectGarbage(GcCtx); + } + + std::filesystem::path BlockPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); + std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test"); + std::filesystem::rename(BlockPath, LegacyDataPath); + + std::vector<CasDiskIndexEntry> LogEntries; + std::filesystem::path IndexPath = GetIndexPath(CasConfig.RootDirectory, "test"); + if (std::filesystem::is_regular_file(IndexPath)) + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CasDiskIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); + CasDiskIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion && + Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) + { + LogEntries.resize(Header.EntryCount); + ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + } + } + ObjectIndexFile.Close(); + std::filesystem::remove(IndexPath); + } + + std::filesystem::path LogPath = GetLogPath(CasConfig.RootDirectory, "test"); + { + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + LogEntries.reserve(CasLog.GetLogCount()); + CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); + } + TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; + std::filesystem::path LegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test"); + LegacyCasLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); + + for (const CasDiskIndexEntry& Entry : LogEntries) + { + BlockStoreLocation Location = Entry.Location.Get(16); + LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size); + LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key, + .Location = LegacyLocation, + .ContentType = Entry.ContentType, + .Flags = Entry.Flags}; + LegacyCasLog.Append(LegacyEntry); + } + LegacyCasLog.Close(); + + std::filesystem::remove_all(CasConfig.RootDirectory / "test"); + + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 2048, 16, false); + + for (size_t i = 0; i < ChunkCount; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } +} + +TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(kChunkCount); + std::vector<IoBuffer> Chunks; + Chunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + ChunkHashes.emplace_back(Hash); + Chunks.emplace_back(Chunk); + } + + WorkerThreadPool ThreadPool(4); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, false); + Cas.Initialize("test", 32768, 16, true); + { + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = Chunks[Idx]; + const IoHash& Hash = ChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + + { + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + IoBuffer Chunk = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Chunk); + CHECK(ChunkHash == Hash); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } + + std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + { + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::vector<IoHash> NewChunkHashes; + NewChunkHashes.reserve(kChunkCount); + std::vector<IoBuffer> NewChunks; + NewChunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunkHashes.emplace_back(Hash); + NewChunks.emplace_back(Chunk); + } + + RwLock ChunkHashesLock; + std::atomic_uint32_t AddedChunkCount; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = NewChunks[Idx]; + const IoHash& Hash = NewChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + AddedChunkCount.fetch_add(1); + }); + ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]); + if (Chunk) + { + CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + } + }); + } + + while (AddedChunkCount.load() < kChunkCount) + { + std::vector<IoHash> AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const IoHash& ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + + { + std::vector<IoHash> AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const IoHash& ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 77 == 0 && C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + } + { + for (const IoHash& ChunkHash : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Cas, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } } } +TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true)) +{ + const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas"; + std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs"); + std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs"); + std::filesystem::remove_all(TobsBasePath); + std::filesystem::remove_all(SobsBasePath); + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = BigDataPath; + uint64_t TObsSize = 0; + { + CasGc TobsCasGc; + CasContainerStrategy TobsCas(CasConfig, TobsCasGc); + TobsCas.Initialize("tobs", 1u << 28, 16, false); + TObsSize = TobsCas.StorageSize().DiskSize; + CHECK(TObsSize > 0); + } + + uint64_t SObsSize = 0; + { + CasGc SobsCasGc; + CasContainerStrategy SobsCas(CasConfig, SobsCasGc); + SobsCas.Initialize("sobs", 1u << 30, 4096, false); + SObsSize = SobsCas.StorageSize().DiskSize; + CHECK(SObsSize > 0); + } + + CasGc TobsCasGc; + CasContainerStrategy TobsCas(CasConfig, TobsCasGc); + TobsCas.Initialize("tobs", 1u << 28, 16, false); + GcContext TobsGcCtx; + TobsCas.CollectGarbage(TobsGcCtx); + CHECK(TobsCas.StorageSize().DiskSize == TObsSize); + + CasGc SobsCasGc; + CasContainerStrategy SobsCas(CasConfig, SobsCasGc); + SobsCas.Initialize("sobs", 1u << 30, 4096, false); + GcContext SobsGcCtx; + SobsCas.CollectGarbage(SobsGcCtx); + CHECK(SobsCas.StorageSize().DiskSize == SObsSize); +} + #endif void |