diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenstore/compactcas.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 1511 |
1 files changed, 1511 insertions, 0 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp new file mode 100644 index 000000000..7b2c21b0f --- /dev/null +++ b/src/zenstore/compactcas.cpp @@ -0,0 +1,1511 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "compactcas.h" + +#include "cas.h" + +#include <zencore/compress.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zenstore/scrubcontext.h> + +#include <gsl/gsl-lite.hpp> + +#include <xxhash.h> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <zenstore/cidstore.h> +# include <algorithm> +# include <random> +#endif + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +struct CasDiskIndexHeader +{ + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } +}; + +static_assert(sizeof(CasDiskIndexHeader) == 32); + +namespace { + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".ulog"; + + std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / ContainerBaseName; + } + + std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); + } + + std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / "blocks"; + } + + bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & CasDiskIndexEntry::kTombstone) + { + return true; + } + if (Entry.ContentType != ZenContentType::kUnknownContentType) + { + OutReason = + fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.GetSize(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + +CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas")) +{ +} + +CasContainerStrategy::~CasContainerStrategy() +{ +} + +void +CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + uint32_t MaxBlockSize, + uint64_t Alignment, + bool IsNewStore) +{ + ZEN_ASSERT(IsPow2(Alignment)); + ZEN_ASSERT(!m_IsInitialized); + ZEN_ASSERT(MaxBlockSize > 0); + + m_RootDirectory = RootDirectory; + m_ContainerBaseName = ContainerBaseName; + m_PayloadAlignment = Alignment; + m_MaxBlockSize = MaxBlockSize; + m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); + + OpenContainer(IsNewStore); + + m_IsInitialized = true; +} + +CasStore::InsertResult +CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) +{ + { + RwLock::SharedLockScope _(m_LocationMapLock); + if (m_LocationMap.contains(ChunkHash)) + { + return CasStore::InsertResult{.New = false}; + } + } + + // We can end up in a situation that InsertChunk writes the same chunk data in + // different locations. + // We release the insert lock once we have the correct WriteBlock ready and we know + // where to write the data. If a new InsertChunk request for the same chunk hash/data + // comes in before we update m_LocationMap below we will have a race. + // The outcome of that is that we will write the chunk data in more than one location + // but the chunk hash will only point to one of the chunks. + // We will in that case waste space until the next GC operation. + // + // This should be a rare occasion and the current flow reduces the time we block for + // reads, insert and GC. + + m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { + BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; + m_CasLog.Append(IndexEntry); + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, DiskLocation); + } + }); + + return CasStore::InsertResult{.New = true}; +} + +CasStore::InsertResult +CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) +{ +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); +} + +IoBuffer +CasContainerStrategy::FindChunk(const IoHash& ChunkHash) +{ + RwLock::SharedLockScope _(m_LocationMapLock); + auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) + { + return IoBuffer(); + } + const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment); + + IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); + return Chunk; +} + +bool +CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) +{ + RwLock::SharedLockScope _(m_LocationMapLock); + return m_LocationMap.contains(ChunkHash); +} + +void +CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) +{ + // This implementation is good enough for relatively small + // chunk sets (in terms of chunk identifiers), but would + // benefit from a better implementation which removes + // items incrementally for large sets, especially when + // we're likely to already have a large proportion of the + // chunks in the set + + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); +} + +void +CasContainerStrategy::Flush() +{ + m_BlockStore.Flush(); + m_CasLog.Flush(); + MakeIndexSnapshot(); +} + +void +CasContainerStrategy::Scrub(ScrubContext& Ctx) +{ + std::vector<IoHash> BadKeys; + uint64_t ChunkCount{0}, ChunkBytes{0}; + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + + RwLock::SharedLockScope _(m_LocationMapLock); + + uint64_t TotalChunkCount = m_LocationMap.size(); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + { + for (const auto& Entry : m_LocationMap) + { + const IoHash& ChunkHash = Entry.first; + const BlockStoreDiskLocation& DiskLocation = Entry.second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash.push_back(ChunkHash); + } + } + + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) + { + // ChunkLocation out of range of stored blocks + BadKeys.push_back(Hash); + return; + } + + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + if (RawHash != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS + IoHash ComputedHash = IoHash::HashBuffer(Data, Size); + if (ComputedHash == Hash) + { + return; + } +#endif + BadKeys.push_back(Hash); + }; + + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + + IoHash RawHash; + uint64_t RawSize; + // TODO: Add API to verify compressed buffer without having to memorymap the whole file + if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + if (RawHash != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS + IoHashStream Hasher; + File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + if (ComputedHash == Hash) + { + return; + } +#endif + BadKeys.push_back(Hash); + }; + + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + + _.ReleaseNow(); + + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + + if (!BadKeys.empty()) + { + ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); + + if (Ctx.RunRecovery()) + { + // Deal with bad chunks by removing them from our lookup map + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + for (const IoHash& ChunkHash : BadKeys) + { + const auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) + { + // Might have been GC'd + continue; + } + LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); + m_LocationMap.erase(KeyIt); + } + } + m_CasLog.Append(LogEntries); + } + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCidChunks(BadKeys); + + ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); +} + +void +CasContainerStrategy::CollectGarbage(GcContext& GcCtx) +{ + // It collects all the blocks that we want to delete chunks from. For each such + // block we keep a list of chunks to retain and a list of chunks to delete. + // + // If there is a block that we are currently writing to, that block is omitted + // from the garbage collection. + // + // Next it will iterate over all blocks that we want to remove chunks from. + // If the block is empty after removal of chunks we mark the block as pending + // delete - we want to delete it as soon as there are no IoBuffers using the + // block file. + // Once complete we update the m_LocationMap by removing the chunks. + // + // If the block is non-empty we write out the chunks we want to keep to a new + // block file (creating new block files as needed). + // + // We update the index as we complete each new block file. This makes it possible + // to break the GC if we want to limit time for execution. + // + // GC can very parallell to regular operation - it will block while taking + // a snapshot of the current m_LocationMap state and while moving blocks it will + // do a blocking operation and update the m_LocationMap after each new block is + // written and figuring out the path to the next new block. + + ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); + + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + + LocationMap_t LocationMap; + BlockStore::ReclaimSnapshotState BlockStoreState; + { + RwLock::SharedLockScope ___(m_LocationMapLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + LocationMap = m_LocationMap; + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + } + + uint64_t TotalChunkCount = LocationMap.size(); + + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : LocationMap) + { + TotalChunkHashes.push_back(Entry.first); + } + + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + + GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreDiskLocation& DiskLocation = KeyIt->second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + if (Keep) + { + KeepChunkIndexes.push_back(ChunkIndex); + } + }); + + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); + return; + } + + std::vector<IoHash> DeletedChunks; + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash]; + LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone}); + DeletedChunks.push_back(ChunkHash); + } + + m_CasLog.Append(LogEntries); + m_CasLog.Flush(); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + for (const CasDiskIndexEntry& Entry : LogEntries) + { + if (Entry.Flags & CasDiskIndexEntry::kTombstone) + { + m_LocationMap.erase(Entry.Key); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } + } + }, + [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); + + GcCtx.AddDeletedCids(DeletedChunks); +} + +void +CasContainerStrategy::MakeIndexSnapshot() +{ + uint64_t LogCount = m_CasLog.GetLogCount(); + if (m_LogFlushPosition == LogCount) + { + return; + } + + ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", + m_RootDirectory / m_ContainerBaseName, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); + fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); + + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(TempIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, TempIndexPath); + } + + try + { + // Write the current state of the location map to a new index state + std::vector<CasDiskIndexEntry> Entries; + + { + RwLock::SharedLockScope ___(m_LocationMapLock); + Entries.resize(m_LocationMap.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_LocationMap) + { + CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second; + } + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CasDiskIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + m_LogFlushPosition = LogCount; + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(IndexPath); + fs::rename(TempIndexPath, IndexPath); + } + } + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(TempIndexPath); + } +} + +uint64_t +CasContainerStrategy::ReadIndexFile() +{ + std::vector<CasDiskIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", + IndexPath, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CasDiskIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); + CasDiskIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && + (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) + { + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; + + std::string InvalidEntryReason; + for (const CasDiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } + + return Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + } + } + return 0; +} + +uint64_t +CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) +{ + std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(LogPath)) + { + size_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", + m_RootDirectory / m_ContainerBaseName, + LogEntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + LogEntryCount = EntryCount - SkipEntryCount; + CasLog.Replay( + [&](const CasDiskIndexEntry& Record) { + LogEntryCount++; + std::string InvalidEntryReason; + if (Record.Flags & CasDiskIndexEntry::kTombstone) + { + m_LocationMap.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + return; + } + m_LocationMap[Record.Key] = Record.Location; + }, + SkipEntryCount); + return LogEntryCount; + } + } + return 0; +} + +void +CasContainerStrategy::OpenContainer(bool IsNewStore) +{ + // Add .running file and delete on clean on close to detect bad termination + + m_LocationMap.clear(); + + std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); + + if (IsNewStore) + { + std::filesystem::remove_all(BasePath); + } + + m_LogFlushPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); + + CreateDirectories(BasePath); + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_LocationMap.size()); + for (const auto& Entry : m_LocationMap) + { + const BlockStoreDiskLocation& Location = Entry.second; + KnownLocations.push_back(Location.Get(m_PayloadAlignment)); + } + + m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); + + if (IsNewStore || (LogEntryCount > 0)) + { + MakeIndexSnapshot(); + } + + // TODO: should validate integrity of container files here +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +namespace { + static IoBuffer CreateRandomChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace + +TEST_CASE("compactcas.hex") +{ + uint32_t Value; + std::string HexString; + CHECK(!ParseHexNumber("", Value)); + char Hex[9]; + + ToHexNumber(0u, Hex); + HexString = std::string(Hex); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0u); + + ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex); + HexString = std::string(Hex); + CHECK(HexString == "ffffffff"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == std::numeric_limits<std::uint32_t>::max()); + + ToHexNumber(0xadf14711u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "adf14711"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0xadf14711u); + + ToHexNumber(0x80000000u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "80000000"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0x80000000u); + + ToHexNumber(0x718293a4u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "718293a4"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0x718293a4u); +} + +TEST_CASE("compactcas.compact.gc") +{ + ScopedTemporaryDirectory TempDir; + + const int kIterationCount = 1000; + + std::vector<IoHash> Keys(kIterationCount); + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); + + for (int i = 0; i < kIterationCount; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + const IoHash Hash = HashBuffer(ObjBuffer); + + Cas.InsertChunk(ObjBuffer, Hash); + + Keys[i] = Hash; + } + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + } + + // Validate that we can still read the inserted data after closing + // the original cas store + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + } +} + +TEST_CASE("compactcas.compact.totalsize") +{ + std::random_device rd; + std::mt19937 g(rd()); + + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + const uint64_t kChunkSize = 1024; + const int32_t kChunkCount = 16; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateRandomChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + } + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + + // Re-open again, this time we should have a snapshot + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + } +} + +TEST_CASE("compactcas.gc.basic") +{ + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); + + IoBuffer Chunk = CreateRandomChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(InsertResult.New); + Cas.Flush(); + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("compactcas.gc.removefile") +{ + ScopedTemporaryDirectory TempDir; + + IoBuffer Chunk = CreateRandomChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); + + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(InsertResult.New); + const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(!InsertResultDup.New); + Cas.Flush(); + } + + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false); + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("compactcas.gc.compact") +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true); + + uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(9); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(CreateRandomChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(9); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); + CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); + CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); + CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); + CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); + CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); + CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); + CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); + CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + // Keep first and last + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep last + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep mixed + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[1]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[7]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(!Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[8], ChunkHashes[8]); + } + + // Keep multiple at end + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[7]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + } + + // Keep every other + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[2]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Verify that we nicely appended blocks even after all GC operations + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + } +} + +TEST_CASE("compactcas.gc.deleteblockonopen") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(CreateRandomChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + // GC every other block + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < 20; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcCtx.AddRetainedCids(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } + } + { + // Re-open + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, false); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } +} + +TEST_CASE("compactcas.gc.handleopeniobuffer") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (const uint64_t& Size : ChunkSizes) + { + Chunks.push_back(CreateRandomChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); + Cas.Flush(); + + // GC everything + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i++) + { + CHECK(!Cas.HaveChunk(ChunkHashes[i])); + } + + CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); +} + +TEST_CASE("compactcas.threadedinsert") +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 4096; + uint64_t ExpectedSize = 0; + + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; + Chunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + while (true) + { + IoBuffer Chunk = CreateRandomChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = Chunk; + ExpectedSize += Chunk.Size(); + break; + } + } + + std::atomic<size_t> WorkCompleted = 0; + WorkerThreadPool ThreadPool(4); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); + { + for (const auto& Chunk : Chunks) + { + const IoHash& Hash = Chunk.first; + const IoBuffer& Buffer = Chunk.second; + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); + ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + + WorkCompleted = 0; + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_LE(ExpectedSize, TotalSize); + CHECK_GE(ExpectedSize + 32768, TotalSize); + + { + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + + std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes; + GcChunkHashes.reserve(Chunks.size()); + for (const auto& Chunk : Chunks) + { + GcChunkHashes.insert(Chunk.first); + } + { + WorkCompleted = 0; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; + NewChunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateRandomChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = Chunk; + } + + std::atomic_uint32_t AddedChunkCount; + + for (const auto& Chunk : NewChunks) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + Cas.InsertChunk(Chunk.second, Chunk.first); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }); + } + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + if (Buffer) + { + CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); + } + WorkCompleted.fetch_add(1); + }); + } + + while (AddedChunkCount.load() < NewChunks.size()) + { + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + if (Cas.HaveChunk(Chunk.first)) + { + GcChunkHashes.emplace(Chunk.first); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + GcCtx.AddRetainedCids(KeepHashes); + Cas.CollectGarbage(GcCtx); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (WorkCompleted < NewChunks.size() + Chunks.size()) + { + Sleep(1); + } + + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + if (Cas.HaveChunk(Chunk.first)) + { + GcChunkHashes.emplace(Chunk.first); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + GcCtx.AddRetainedCids(KeepHashes); + Cas.CollectGarbage(GcCtx); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + { + WorkCompleted = 0; + for (const IoHash& ChunkHash : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < GcChunkHashes.size()) + { + Sleep(1); + } + } + } +} + +#endif + +void +compactcas_forcelink() +{ +} + +} // namespace zen |