diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /zenstore/compactcas.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 1511 |
1 files changed, 0 insertions, 1511 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp deleted file mode 100644 index 7b2c21b0f..000000000 --- a/zenstore/compactcas.cpp +++ /dev/null @@ -1,1511 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "compactcas.h" - -#include "cas.h" - -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zenstore/scrubcontext.h> - -#include <gsl/gsl-lite.hpp> - -#include <xxhash.h> - -#if ZEN_WITH_TESTS -# include <zencore/compactbinarybuilder.h> -# include <zencore/testing.h> -# include <zencore/testutils.h> -# include <zencore/workthreadpool.h> -# include <zenstore/cidstore.h> -# include <algorithm> -# include <random> -#endif - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -struct CasDiskIndexHeader -{ - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t CurrentVersion = 1; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t PayloadAlignment = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } -}; - -static_assert(sizeof(CasDiskIndexHeader) == 32); - -namespace { - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".ulog"; - - std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return RootPath / ContainerBaseName; - } - - std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); - } - - std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); - } - - std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); - } - - std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return GetBasePath(RootPath, ContainerBaseName) / "blocks"; - } - - bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); - return false; - } - if (Entry.Flags & CasDiskIndexEntry::kTombstone) - { - return true; - } - if (Entry.ContentType != ZenContentType::kUnknownContentType) - { - OutReason = - fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); - return false; - } - uint64_t Size = Entry.Location.GetSize(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - -} // namespace - -////////////////////////////////////////////////////////////////////////// - -CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas")) -{ -} - -CasContainerStrategy::~CasContainerStrategy() -{ -} - -void -CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, - const std::string_view ContainerBaseName, - uint32_t MaxBlockSize, - uint64_t Alignment, - bool IsNewStore) -{ - ZEN_ASSERT(IsPow2(Alignment)); - ZEN_ASSERT(!m_IsInitialized); - ZEN_ASSERT(MaxBlockSize > 0); - - m_RootDirectory = RootDirectory; - m_ContainerBaseName = ContainerBaseName; - m_PayloadAlignment = Alignment; - m_MaxBlockSize = MaxBlockSize; - m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); - - OpenContainer(IsNewStore); - - m_IsInitialized = true; -} - -CasStore::InsertResult -CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) -{ - { - RwLock::SharedLockScope _(m_LocationMapLock); - if (m_LocationMap.contains(ChunkHash)) - { - return CasStore::InsertResult{.New = false}; - } - } - - // We can end up in a situation that InsertChunk writes the same chunk data in - // different locations. - // We release the insert lock once we have the correct WriteBlock ready and we know - // where to write the data. If a new InsertChunk request for the same chunk hash/data - // comes in before we update m_LocationMap below we will have a race. - // The outcome of that is that we will write the chunk data in more than one location - // but the chunk hash will only point to one of the chunks. - // We will in that case waste space until the next GC operation. - // - // This should be a rare occasion and the current flow reduces the time we block for - // reads, insert and GC. - - m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { - BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); - const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); - { - RwLock::ExclusiveLockScope _(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, DiskLocation); - } - }); - - return CasStore::InsertResult{.New = true}; -} - -CasStore::InsertResult -CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) -{ -#if !ZEN_WITH_TESTS - ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); -#endif - return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); -} - -IoBuffer -CasContainerStrategy::FindChunk(const IoHash& ChunkHash) -{ - RwLock::SharedLockScope _(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt == m_LocationMap.end()) - { - return IoBuffer(); - } - const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment); - - IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); - return Chunk; -} - -bool -CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) -{ - RwLock::SharedLockScope _(m_LocationMapLock); - return m_LocationMap.contains(ChunkHash); -} - -void -CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) -{ - // This implementation is good enough for relatively small - // chunk sets (in terms of chunk identifiers), but would - // benefit from a better implementation which removes - // items incrementally for large sets, especially when - // we're likely to already have a large proportion of the - // chunks in the set - - InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); -} - -void -CasContainerStrategy::Flush() -{ - m_BlockStore.Flush(); - m_CasLog.Flush(); - MakeIndexSnapshot(); -} - -void -CasContainerStrategy::Scrub(ScrubContext& Ctx) -{ - std::vector<IoHash> BadKeys; - uint64_t ChunkCount{0}, ChunkBytes{0}; - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; - - RwLock::SharedLockScope _(m_LocationMapLock); - - uint64_t TotalChunkCount = m_LocationMap.size(); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - { - for (const auto& Entry : m_LocationMap) - { - const IoHash& ChunkHash = Entry.first; - const BlockStoreDiskLocation& DiskLocation = Entry.second; - BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); - - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash.push_back(ChunkHash); - } - } - - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); - return; - } - - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - if (RawHash != Hash) - { - // Hash mismatch - BadKeys.push_back(Hash); - return; - } - return; - } -#if ZEN_WITH_TESTS - IoHash ComputedHash = IoHash::HashBuffer(Data, Size); - if (ComputedHash == Hash) - { - return; - } -#endif - BadKeys.push_back(Hash); - }; - - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - - IoHash RawHash; - uint64_t RawSize; - // TODO: Add API to verify compressed buffer without having to memorymap the whole file - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - if (RawHash != Hash) - { - // Hash mismatch - BadKeys.push_back(Hash); - return; - } - return; - } -#if ZEN_WITH_TESTS - IoHashStream Hasher; - File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); - if (ComputedHash == Hash) - { - return; - } -#endif - BadKeys.push_back(Hash); - }; - - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); - - _.ReleaseNow(); - - Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - - if (!BadKeys.empty()) - { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); - - if (Ctx.RunRecovery()) - { - // Deal with bad chunks by removing them from our lookup map - - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - for (const IoHash& ChunkHash : BadKeys) - { - const auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt == m_LocationMap.end()) - { - // Might have been GC'd - continue; - } - LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); - m_LocationMap.erase(KeyIt); - } - } - m_CasLog.Append(LogEntries); - } - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadKeys); - - ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); -} - -void -CasContainerStrategy::CollectGarbage(GcContext& GcCtx) -{ - // It collects all the blocks that we want to delete chunks from. For each such - // block we keep a list of chunks to retain and a list of chunks to delete. - // - // If there is a block that we are currently writing to, that block is omitted - // from the garbage collection. - // - // Next it will iterate over all blocks that we want to remove chunks from. - // If the block is empty after removal of chunks we mark the block as pending - // delete - we want to delete it as soon as there are no IoBuffers using the - // block file. - // Once complete we update the m_LocationMap by removing the chunks. - // - // If the block is non-empty we write out the chunks we want to keep to a new - // block file (creating new block files as needed). - // - // We update the index as we complete each new block file. This makes it possible - // to break the GC if we want to limit time for execution. - // - // GC can very parallell to regular operation - it will block while taking - // a snapshot of the current m_LocationMap state and while moving blocks it will - // do a blocking operation and update the m_LocationMap after each new block is - // written and figuring out the path to the next new block. - - ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); - - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - - LocationMap_t LocationMap; - BlockStore::ReclaimSnapshotState BlockStoreState; - { - RwLock::SharedLockScope ___(m_LocationMapLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - LocationMap = m_LocationMap; - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - } - - uint64_t TotalChunkCount = LocationMap.size(); - - std::vector<IoHash> TotalChunkHashes; - TotalChunkHashes.reserve(TotalChunkCount); - for (const auto& Entry : LocationMap) - { - TotalChunkHashes.push_back(Entry.first); - } - - std::vector<BlockStoreLocation> ChunkLocations; - BlockStore::ChunkIndexArray KeepChunkIndexes; - std::vector<IoHash> ChunkIndexToChunkHash; - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - - GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = LocationMap.find(ChunkHash); - const BlockStoreDiskLocation& DiskLocation = KeyIt->second; - BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); - size_t ChunkIndex = ChunkLocations.size(); - - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; - if (Keep) - { - KeepChunkIndexes.push_back(ChunkIndex); - } - }); - - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - if (!PerformDelete) - { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); - return; - } - - std::vector<IoHash> DeletedChunks; - m_BlockStore.ReclaimSpace( - BlockStoreState, - ChunkLocations, - KeepChunkIndexes, - m_PayloadAlignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}}); - } - for (const size_t ChunkIndex : RemovedChunks) - { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash]; - LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone}); - DeletedChunks.push_back(ChunkHash); - } - - m_CasLog.Append(LogEntries); - m_CasLog.Flush(); - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - for (const CasDiskIndexEntry& Entry : LogEntries) - { - if (Entry.Flags & CasDiskIndexEntry::kTombstone) - { - m_LocationMap.erase(Entry.Key); - continue; - } - m_LocationMap[Entry.Key] = Entry.Location; - } - } - }, - [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); - - GcCtx.AddDeletedCids(DeletedChunks); -} - -void -CasContainerStrategy::MakeIndexSnapshot() -{ - uint64_t LogCount = m_CasLog.GetLogCount(); - if (m_LogFlushPosition == LogCount) - { - return; - } - - ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_RootDirectory / m_ContainerBaseName, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); - fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(TempIndexPath)) - { - fs::remove(TempIndexPath); - } - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, TempIndexPath); - } - - try - { - // Write the current state of the location map to a new index state - std::vector<CasDiskIndexEntry> Entries; - - { - RwLock::SharedLockScope ___(m_LocationMapLock); - Entries.resize(m_LocationMap.size()); - - uint64_t EntryIndex = 0; - for (auto& Entry : m_LocationMap) - { - CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = Entry.second; - } - } - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - CasDiskIndexHeader Header = {.EntryCount = Entries.size(), - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; - - Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); - - ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(TempIndexPath)) - { - fs::remove(IndexPath); - fs::rename(TempIndexPath, IndexPath); - } - } - if (fs::is_regular_file(TempIndexPath)) - { - fs::remove(TempIndexPath); - } -} - -uint64_t -CasContainerStrategy::ReadIndexFile() -{ - std::vector<CasDiskIndexEntry> Entries; - std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); - if (std::filesystem::is_regular_file(IndexPath)) - { - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - Entries.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CasDiskIndexHeader)) - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); - CasDiskIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && - (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && - (Header.EntryCount <= ExpectedEntryCount)) - { - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); - m_PayloadAlignment = Header.PayloadAlignment; - - std::string InvalidEntryReason; - for (const CasDiskIndexEntry& Entry : Entries) - { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - m_LocationMap[Entry.Key] = Entry.Location; - } - - return Header.LogPosition; - } - else - { - ZEN_WARN("skipping invalid index file '{}'", IndexPath); - } - } - } - return 0; -} - -uint64_t -CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) -{ - std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); - if (std::filesystem::is_regular_file(LogPath)) - { - size_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", - m_RootDirectory / m_ContainerBaseName, - LogEntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - TCasLogFile<CasDiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - CasLog.Replay( - [&](const CasDiskIndexEntry& Record) { - LogEntryCount++; - std::string InvalidEntryReason; - if (Record.Flags & CasDiskIndexEntry::kTombstone) - { - m_LocationMap.erase(Record.Key); - return; - } - if (!ValidateEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - return; - } - m_LocationMap[Record.Key] = Record.Location; - }, - SkipEntryCount); - return LogEntryCount; - } - } - return 0; -} - -void -CasContainerStrategy::OpenContainer(bool IsNewStore) -{ - // Add .running file and delete on clean on close to detect bad termination - - m_LocationMap.clear(); - - std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); - - if (IsNewStore) - { - std::filesystem::remove_all(BasePath); - } - - m_LogFlushPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); - - CreateDirectories(BasePath); - - std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); - m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_LocationMap.size()); - for (const auto& Entry : m_LocationMap) - { - const BlockStoreDiskLocation& Location = Entry.second; - KnownLocations.push_back(Location.Get(m_PayloadAlignment)); - } - - m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); - - if (IsNewStore || (LogEntryCount > 0)) - { - MakeIndexSnapshot(); - } - - // TODO: should validate integrity of container files here -} - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -namespace { - static IoBuffer CreateRandomChunk(uint64_t Size) - { - static std::random_device rd; - static std::mt19937 g(rd()); - - std::vector<uint8_t> Values; - Values.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) - { - Values[Idx] = static_cast<uint8_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); - } -} // namespace - -TEST_CASE("compactcas.hex") -{ - uint32_t Value; - std::string HexString; - CHECK(!ParseHexNumber("", Value)); - char Hex[9]; - - ToHexNumber(0u, Hex); - HexString = std::string(Hex); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == 0u); - - ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex); - HexString = std::string(Hex); - CHECK(HexString == "ffffffff"); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == std::numeric_limits<std::uint32_t>::max()); - - ToHexNumber(0xadf14711u, Hex); - HexString = std::string(Hex); - CHECK(HexString == "adf14711"); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == 0xadf14711u); - - ToHexNumber(0x80000000u, Hex); - HexString = std::string(Hex); - CHECK(HexString == "80000000"); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == 0x80000000u); - - ToHexNumber(0x718293a4u, Hex); - HexString = std::string(Hex); - CHECK(HexString == "718293a4"); - CHECK(ParseHexNumber(HexString, Value)); - CHECK(Value == 0x718293a4u); -} - -TEST_CASE("compactcas.compact.gc") -{ - ScopedTemporaryDirectory TempDir; - - const int kIterationCount = 1000; - - std::vector<IoHash> Keys(kIterationCount); - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); - - for (int i = 0; i < kIterationCount; ++i) - { - CbObjectWriter Cbo; - Cbo << "id" << i; - CbObject Obj = Cbo.Save(); - - IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); - const IoHash Hash = HashBuffer(ObjBuffer); - - Cas.InsertChunk(ObjBuffer, Hash); - - Keys[i] = Hash; - } - - for (int i = 0; i < kIterationCount; ++i) - { - IoBuffer Chunk = Cas.FindChunk(Keys[i]); - - CHECK(!!Chunk); - - CbObject Value = LoadCompactBinaryObject(Chunk); - - CHECK_EQ(Value["id"].AsInt32(), i); - } - } - - // Validate that we can still read the inserted data after closing - // the original cas store - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); - - for (int i = 0; i < kIterationCount; ++i) - { - IoBuffer Chunk = Cas.FindChunk(Keys[i]); - - CHECK(!!Chunk); - - CbObject Value = LoadCompactBinaryObject(Chunk); - - CHECK_EQ(Value["id"].AsInt32(), i); - } - } -} - -TEST_CASE("compactcas.compact.totalsize") -{ - std::random_device rd; - std::mt19937 g(rd()); - - // for (uint32_t i = 0; i < 100; ++i) - { - ScopedTemporaryDirectory TempDir; - - const uint64_t kChunkSize = 1024; - const int32_t kChunkCount = 16; - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - IoBuffer Chunk = CreateRandomChunk(kChunkSize); - const IoHash Hash = HashBuffer(Chunk); - CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); - } - - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - } - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); - - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - } - - // Re-open again, this time we should have a snapshot - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); - - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - } - } -} - -TEST_CASE("compactcas.gc.basic") -{ - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); - - IoBuffer Chunk = CreateRandomChunk(128); - IoHash ChunkHash = IoHash::HashBuffer(Chunk); - - const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); - CHECK(InsertResult.New); - Cas.Flush(); - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHash)); -} - -TEST_CASE("compactcas.gc.removefile") -{ - ScopedTemporaryDirectory TempDir; - - IoBuffer Chunk = CreateRandomChunk(128); - IoHash ChunkHash = IoHash::HashBuffer(Chunk); - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); - - const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); - CHECK(InsertResult.New); - const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); - CHECK(!InsertResultDup.New); - Cas.Flush(); - } - - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false); - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHash)); -} - -TEST_CASE("compactcas.gc.compact") -{ - // for (uint32_t i = 0; i < 100; ++i) - { - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true); - - uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(9); - for (uint64_t Size : ChunkSizes) - { - Chunks.push_back(CreateRandomChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(9); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); - CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); - CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); - CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); - CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); - CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); - CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); - CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); - CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); - - CHECK(Cas.HaveChunk(ChunkHashes[0])); - CHECK(Cas.HaveChunk(ChunkHashes[1])); - CHECK(Cas.HaveChunk(ChunkHashes[2])); - CHECK(Cas.HaveChunk(ChunkHashes[3])); - CHECK(Cas.HaveChunk(ChunkHashes[4])); - CHECK(Cas.HaveChunk(ChunkHashes[5])); - CHECK(Cas.HaveChunk(ChunkHashes[6])); - CHECK(Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - // Keep first and last - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[0]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(Cas.HaveChunk(ChunkHashes[0])); - CHECK(!Cas.HaveChunk(ChunkHashes[1])); - CHECK(!Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(!Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(!Cas.HaveChunk(ChunkHashes[6])); - CHECK(!Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[4], ChunkHashes[4]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[6], ChunkHashes[6]); - Cas.InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Keep last - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHashes[0])); - CHECK(!Cas.HaveChunk(ChunkHashes[1])); - CHECK(!Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(!Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(!Cas.HaveChunk(ChunkHashes[6])); - CHECK(!Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[4], ChunkHashes[4]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[6], ChunkHashes[6]); - Cas.InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Keep mixed - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[1]); - KeepChunks.push_back(ChunkHashes[4]); - KeepChunks.push_back(ChunkHashes[7]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHashes[0])); - CHECK(Cas.HaveChunk(ChunkHashes[1])); - CHECK(!Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(!Cas.HaveChunk(ChunkHashes[6])); - CHECK(Cas.HaveChunk(ChunkHashes[7])); - CHECK(!Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); - - Cas.InsertChunk(Chunks[0], ChunkHashes[0]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[6], ChunkHashes[6]); - Cas.InsertChunk(Chunks[8], ChunkHashes[8]); - } - - // Keep multiple at end - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[6]); - KeepChunks.push_back(ChunkHashes[7]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(!Cas.HaveChunk(ChunkHashes[0])); - CHECK(!Cas.HaveChunk(ChunkHashes[1])); - CHECK(!Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(!Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(Cas.HaveChunk(ChunkHashes[6])); - CHECK(Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - Cas.InsertChunk(Chunks[0], ChunkHashes[0]); - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[4], ChunkHashes[4]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - } - - // Keep every other - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[0]); - KeepChunks.push_back(ChunkHashes[2]); - KeepChunks.push_back(ChunkHashes[4]); - KeepChunks.push_back(ChunkHashes[6]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - CHECK(Cas.HaveChunk(ChunkHashes[0])); - CHECK(!Cas.HaveChunk(ChunkHashes[1])); - CHECK(Cas.HaveChunk(ChunkHashes[2])); - CHECK(!Cas.HaveChunk(ChunkHashes[3])); - CHECK(Cas.HaveChunk(ChunkHashes[4])); - CHECK(!Cas.HaveChunk(ChunkHashes[5])); - CHECK(Cas.HaveChunk(ChunkHashes[6])); - CHECK(!Cas.HaveChunk(ChunkHashes[7])); - CHECK(Cas.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Verify that we nicely appended blocks even after all GC operations - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - } -} - -TEST_CASE("compactcas.gc.deleteblockonopen") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(20); - for (uint64_t Size : ChunkSizes) - { - Chunks.push_back(CreateRandomChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(20); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - { - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); - - for (size_t i = 0; i < 20; i++) - { - CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); - } - - // GC every other block - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - for (size_t i = 0; i < 20; i += 2) - { - KeepChunks.push_back(ChunkHashes[i]); - } - GcCtx.AddRetainedCids(KeepChunks); - - Cas.Flush(); - Cas.CollectGarbage(GcCtx); - - for (size_t i = 0; i < 20; i += 2) - { - CHECK(Cas.HaveChunk(ChunkHashes[i])); - CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); - } - } - } - { - // Re-open - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 1024, 16, false); - - for (size_t i = 0; i < 20; i += 2) - { - CHECK(Cas.HaveChunk(ChunkHashes[i])); - CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); - } - } -} - -TEST_CASE("compactcas.gc.handleopeniobuffer") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(20); - for (const uint64_t& Size : ChunkSizes) - { - Chunks.push_back(CreateRandomChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(20); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); - - for (size_t i = 0; i < 20; i++) - { - CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); - } - - IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); - Cas.Flush(); - - // GC everything - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - Cas.CollectGarbage(GcCtx); - - for (size_t i = 0; i < 20; i++) - { - CHECK(!Cas.HaveChunk(ChunkHashes[i])); - } - - CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); -} - -TEST_CASE("compactcas.threadedinsert") -{ - // for (uint32_t i = 0; i < 100; ++i) - { - ScopedTemporaryDirectory TempDir; - - const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 4096; - uint64_t ExpectedSize = 0; - - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; - Chunks.reserve(kChunkCount); - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - while (true) - { - IoBuffer Chunk = CreateRandomChunk(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - if (Chunks.contains(Hash)) - { - continue; - } - Chunks[Hash] = Chunk; - ExpectedSize += Chunk.Size(); - break; - } - } - - std::atomic<size_t> WorkCompleted = 0; - WorkerThreadPool ThreadPool(4); - GcManager Gc; - CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); - { - for (const auto& Chunk : Chunks) - { - const IoHash& Hash = Chunk.first; - const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); - ZEN_ASSERT(InsertResult.New); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - } - - WorkCompleted = 0; - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_LE(ExpectedSize, TotalSize); - CHECK_GE(ExpectedSize + 32768, TotalSize); - - { - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - IoHash Hash = IoHash::HashBuffer(Buffer); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - } - - std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes; - GcChunkHashes.reserve(Chunks.size()); - for (const auto& Chunk : Chunks) - { - GcChunkHashes.insert(Chunk.first); - } - { - WorkCompleted = 0; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; - NewChunks.reserve(kChunkCount); - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - IoBuffer Chunk = CreateRandomChunk(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunks[Hash] = Chunk; - } - - std::atomic_uint32_t AddedChunkCount; - - for (const auto& Chunk : NewChunks) - { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { - Cas.InsertChunk(Chunk.second, Chunk.first); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); - } - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - if (Buffer) - { - CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); - } - WorkCompleted.fetch_add(1); - }); - } - - while (AddedChunkCount.load() < NewChunks.size()) - { - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - if (Cas.HaveChunk(Chunk.first)) - { - GcChunkHashes.emplace(Chunk.first); - } - } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Cas.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } - - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } - - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - if (Cas.HaveChunk(Chunk.first)) - { - GcChunkHashes.emplace(Chunk.first); - } - } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Cas.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } - { - WorkCompleted = 0; - for (const IoHash& ChunkHash : GcChunkHashes) - { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { - CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } - } - } -} - -#endif - -void -compactcas_forcelink() -{ -} - -} // namespace zen |