aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /zenstore/compactcas.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp1511
1 files changed, 0 insertions, 1511 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
deleted file mode 100644
index 7b2c21b0f..000000000
--- a/zenstore/compactcas.cpp
+++ /dev/null
@@ -1,1511 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "compactcas.h"
-
-#include "cas.h"
-
-#include <zencore/compress.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zenstore/scrubcontext.h>
-
-#include <gsl/gsl-lite.hpp>
-
-#include <xxhash.h>
-
-#if ZEN_WITH_TESTS
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
-# include <zenstore/cidstore.h>
-# include <algorithm>
-# include <random>
-#endif
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-struct CasDiskIndexHeader
-{
- static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t CurrentVersion = 1;
-
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint64_t EntryCount = 0;
- uint64_t LogPosition = 0;
- uint32_t PayloadAlignment = 0;
- uint32_t Checksum = 0;
-
- static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header)
- {
- return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
- }
-};
-
-static_assert(sizeof(CasDiskIndexHeader) == 32);
-
-namespace {
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".ulog";
-
- std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return RootPath / ContainerBaseName;
- }
-
- std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension);
- }
-
- std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension);
- }
-
- std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension);
- }
-
- std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return GetBasePath(RootPath, ContainerBaseName) / "blocks";
- }
-
- bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0)
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
- {
- return true;
- }
- if (Entry.ContentType != ZenContentType::kUnknownContentType)
- {
- OutReason =
- fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
- return false;
- }
- uint64_t Size = Entry.Location.GetSize();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
-} // namespace
-
-//////////////////////////////////////////////////////////////////////////
-
-CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas"))
-{
-}
-
-CasContainerStrategy::~CasContainerStrategy()
-{
-}
-
-void
-CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory,
- const std::string_view ContainerBaseName,
- uint32_t MaxBlockSize,
- uint64_t Alignment,
- bool IsNewStore)
-{
- ZEN_ASSERT(IsPow2(Alignment));
- ZEN_ASSERT(!m_IsInitialized);
- ZEN_ASSERT(MaxBlockSize > 0);
-
- m_RootDirectory = RootDirectory;
- m_ContainerBaseName = ContainerBaseName;
- m_PayloadAlignment = Alignment;
- m_MaxBlockSize = MaxBlockSize;
- m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName);
-
- OpenContainer(IsNewStore);
-
- m_IsInitialized = true;
-}
-
-CasStore::InsertResult
-CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
-{
- {
- RwLock::SharedLockScope _(m_LocationMapLock);
- if (m_LocationMap.contains(ChunkHash))
- {
- return CasStore::InsertResult{.New = false};
- }
- }
-
- // We can end up in a situation that InsertChunk writes the same chunk data in
- // different locations.
- // We release the insert lock once we have the correct WriteBlock ready and we know
- // where to write the data. If a new InsertChunk request for the same chunk hash/data
- // comes in before we update m_LocationMap below we will have a race.
- // The outcome of that is that we will write the chunk data in more than one location
- // but the chunk hash will only point to one of the chunks.
- // We will in that case waste space until the next GC operation.
- //
- // This should be a rare occasion and the current flow reduces the time we block for
- // reads, insert and GC.
-
- m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) {
- BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
- {
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, DiskLocation);
- }
- });
-
- return CasStore::InsertResult{.New = true};
-}
-
-CasStore::InsertResult
-CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
-{
-#if !ZEN_WITH_TESTS
- ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
-#endif
- return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
-}
-
-IoBuffer
-CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
-{
- RwLock::SharedLockScope _(m_LocationMapLock);
- auto KeyIt = m_LocationMap.find(ChunkHash);
- if (KeyIt == m_LocationMap.end())
- {
- return IoBuffer();
- }
- const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment);
-
- IoBuffer Chunk = m_BlockStore.TryGetChunk(Location);
- return Chunk;
-}
-
-bool
-CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
-{
- RwLock::SharedLockScope _(m_LocationMapLock);
- return m_LocationMap.contains(ChunkHash);
-}
-
-void
-CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
-{
- // This implementation is good enough for relatively small
- // chunk sets (in terms of chunk identifiers), but would
- // benefit from a better implementation which removes
- // items incrementally for large sets, especially when
- // we're likely to already have a large proportion of the
- // chunks in the set
-
- InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
-}
-
-void
-CasContainerStrategy::Flush()
-{
- m_BlockStore.Flush();
- m_CasLog.Flush();
- MakeIndexSnapshot();
-}
-
-void
-CasContainerStrategy::Scrub(ScrubContext& Ctx)
-{
- std::vector<IoHash> BadKeys;
- uint64_t ChunkCount{0}, ChunkBytes{0};
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkIndexToChunkHash;
-
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- uint64_t TotalChunkCount = m_LocationMap.size();
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
- {
- for (const auto& Entry : m_LocationMap)
- {
- const IoHash& ChunkHash = Entry.first;
- const BlockStoreDiskLocation& DiskLocation = Entry.second;
- BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
-
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash.push_back(ChunkHash);
- }
- }
-
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
-
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- BadKeys.push_back(Hash);
- return;
- }
-
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- if (RawHash != Hash)
- {
- // Hash mismatch
- BadKeys.push_back(Hash);
- return;
- }
- return;
- }
-#if ZEN_WITH_TESTS
- IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
- if (ComputedHash == Hash)
- {
- return;
- }
-#endif
- BadKeys.push_back(Hash);
- };
-
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
-
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
-
- IoHash RawHash;
- uint64_t RawSize;
- // TODO: Add API to verify compressed buffer without having to memorymap the whole file
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- if (RawHash != Hash)
- {
- // Hash mismatch
- BadKeys.push_back(Hash);
- return;
- }
- return;
- }
-#if ZEN_WITH_TESTS
- IoHashStream Hasher;
- File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
- if (ComputedHash == Hash)
- {
- return;
- }
-#endif
- BadKeys.push_back(Hash);
- };
-
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
-
- _.ReleaseNow();
-
- Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
-
- if (!BadKeys.empty())
- {
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
-
- if (Ctx.RunRecovery())
- {
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- for (const IoHash& ChunkHash : BadKeys)
- {
- const auto KeyIt = m_LocationMap.find(ChunkHash);
- if (KeyIt == m_LocationMap.end())
- {
- // Might have been GC'd
- continue;
- }
- LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
- m_LocationMap.erase(KeyIt);
- }
- }
- m_CasLog.Append(LogEntries);
- }
- }
-
- // Let whomever it concerns know about the bad chunks. This could
- // be used to invalidate higher level data structures more efficiently
- // than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadKeys);
-
- ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
-}
-
-void
-CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
-{
- // It collects all the blocks that we want to delete chunks from. For each such
- // block we keep a list of chunks to retain and a list of chunks to delete.
- //
- // If there is a block that we are currently writing to, that block is omitted
- // from the garbage collection.
- //
- // Next it will iterate over all blocks that we want to remove chunks from.
- // If the block is empty after removal of chunks we mark the block as pending
- // delete - we want to delete it as soon as there are no IoBuffers using the
- // block file.
- // Once complete we update the m_LocationMap by removing the chunks.
- //
- // If the block is non-empty we write out the chunks we want to keep to a new
- // block file (creating new block files as needed).
- //
- // We update the index as we complete each new block file. This makes it possible
- // to break the GC if we want to limit time for execution.
- //
- // GC can very parallell to regular operation - it will block while taking
- // a snapshot of the current m_LocationMap state and while moving blocks it will
- // do a blocking operation and update the m_LocationMap after each new block is
- // written and figuring out the path to the next new block.
-
- ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);
-
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
-
- LocationMap_t LocationMap;
- BlockStore::ReclaimSnapshotState BlockStoreState;
- {
- RwLock::SharedLockScope ___(m_LocationMapLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- LocationMap = m_LocationMap;
- BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
- }
-
- uint64_t TotalChunkCount = LocationMap.size();
-
- std::vector<IoHash> TotalChunkHashes;
- TotalChunkHashes.reserve(TotalChunkCount);
- for (const auto& Entry : LocationMap)
- {
- TotalChunkHashes.push_back(Entry.first);
- }
-
- std::vector<BlockStoreLocation> ChunkLocations;
- BlockStore::ChunkIndexArray KeepChunkIndexes;
- std::vector<IoHash> ChunkIndexToChunkHash;
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
-
- GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- auto KeyIt = LocationMap.find(ChunkHash);
- const BlockStoreDiskLocation& DiskLocation = KeyIt->second;
- BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
- size_t ChunkIndex = ChunkLocations.size();
-
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- if (Keep)
- {
- KeepChunkIndexes.push_back(ChunkIndex);
- }
- });
-
- const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
- if (!PerformDelete)
- {
- m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
- return;
- }
-
- std::vector<IoHash> DeletedChunks;
- m_BlockStore.ReclaimSpace(
- BlockStoreState,
- ChunkLocations,
- KeepChunkIndexes,
- m_PayloadAlignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}});
- }
- for (const size_t ChunkIndex : RemovedChunks)
- {
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash];
- LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone});
- DeletedChunks.push_back(ChunkHash);
- }
-
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
- {
- m_LocationMap.erase(Entry.Key);
- continue;
- }
- m_LocationMap[Entry.Key] = Entry.Location;
- }
- }
- },
- [&GcCtx]() { return GcCtx.CollectSmallObjects(); });
-
- GcCtx.AddDeletedCids(DeletedChunks);
-}
-
-void
-CasContainerStrategy::MakeIndexSnapshot()
-{
- uint64_t LogCount = m_CasLog.GetLogCount();
- if (m_LogFlushPosition == LogCount)
- {
- return;
- }
-
- ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName);
- uint64_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
- m_RootDirectory / m_ContainerBaseName,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- namespace fs = std::filesystem;
-
- fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
- fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(TempIndexPath))
- {
- fs::remove(TempIndexPath);
- }
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, TempIndexPath);
- }
-
- try
- {
- // Write the current state of the location map to a new index state
- std::vector<CasDiskIndexEntry> Entries;
-
- {
- RwLock::SharedLockScope ___(m_LocationMapLock);
- Entries.resize(m_LocationMap.size());
-
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_LocationMap)
- {
- CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = Entry.second;
- }
- }
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
- CasDiskIndexHeader Header = {.EntryCount = Entries.size(),
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
-
- Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header);
-
- ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0);
- ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry));
- ObjectIndexFile.Flush();
- ObjectIndexFile.Close();
- EntryCount = Entries.size();
- m_LogFlushPosition = LogCount;
- }
- catch (std::exception& Err)
- {
- ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
-
- // Restore any previous snapshot
-
- if (fs::is_regular_file(TempIndexPath))
- {
- fs::remove(IndexPath);
- fs::rename(TempIndexPath, IndexPath);
- }
- }
- if (fs::is_regular_file(TempIndexPath))
- {
- fs::remove(TempIndexPath);
- }
-}
-
-uint64_t
-CasContainerStrategy::ReadIndexFile()
-{
- std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
- if (std::filesystem::is_regular_file(IndexPath))
- {
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}",
- IndexPath,
- Entries.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CasDiskIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
- CasDiskIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
- (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
- (Header.EntryCount <= ExpectedEntryCount))
- {
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
- m_PayloadAlignment = Header.PayloadAlignment;
-
- std::string InvalidEntryReason;
- for (const CasDiskIndexEntry& Entry : Entries)
- {
- if (!ValidateEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- m_LocationMap[Entry.Key] = Entry.Location;
- }
-
- return Header.LogPosition;
- }
- else
- {
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
- }
- }
- }
- return 0;
-}
-
-uint64_t
-CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
-{
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
- if (std::filesystem::is_regular_file(LogPath))
- {
- size_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}",
- m_RootDirectory / m_ContainerBaseName,
- LogEntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- TCasLogFile<CasDiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
- LogEntryCount = EntryCount - SkipEntryCount;
- CasLog.Replay(
- [&](const CasDiskIndexEntry& Record) {
- LogEntryCount++;
- std::string InvalidEntryReason;
- if (Record.Flags & CasDiskIndexEntry::kTombstone)
- {
- m_LocationMap.erase(Record.Key);
- return;
- }
- if (!ValidateEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- return;
- }
- m_LocationMap[Record.Key] = Record.Location;
- },
- SkipEntryCount);
- return LogEntryCount;
- }
- }
- return 0;
-}
-
-void
-CasContainerStrategy::OpenContainer(bool IsNewStore)
-{
- // Add .running file and delete on clean on close to detect bad termination
-
- m_LocationMap.clear();
-
- std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName);
-
- if (IsNewStore)
- {
- std::filesystem::remove_all(BasePath);
- }
-
- m_LogFlushPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
-
- CreateDirectories(BasePath);
-
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
- m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_LocationMap.size());
- for (const auto& Entry : m_LocationMap)
- {
- const BlockStoreDiskLocation& Location = Entry.second;
- KnownLocations.push_back(Location.Get(m_PayloadAlignment));
- }
-
- m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
-
- if (IsNewStore || (LogEntryCount > 0))
- {
- MakeIndexSnapshot();
- }
-
- // TODO: should validate integrity of container files here
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-
-namespace {
- static IoBuffer CreateRandomChunk(uint64_t Size)
- {
- static std::random_device rd;
- static std::mt19937 g(rd());
-
- std::vector<uint8_t> Values;
- Values.resize(Size);
- for (size_t Idx = 0; Idx < Size; ++Idx)
- {
- Values[Idx] = static_cast<uint8_t>(Idx);
- }
- std::shuffle(Values.begin(), Values.end(), g);
-
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
- }
-} // namespace
-
-TEST_CASE("compactcas.hex")
-{
- uint32_t Value;
- std::string HexString;
- CHECK(!ParseHexNumber("", Value));
- char Hex[9];
-
- ToHexNumber(0u, Hex);
- HexString = std::string(Hex);
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == 0u);
-
- ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex);
- HexString = std::string(Hex);
- CHECK(HexString == "ffffffff");
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == std::numeric_limits<std::uint32_t>::max());
-
- ToHexNumber(0xadf14711u, Hex);
- HexString = std::string(Hex);
- CHECK(HexString == "adf14711");
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == 0xadf14711u);
-
- ToHexNumber(0x80000000u, Hex);
- HexString = std::string(Hex);
- CHECK(HexString == "80000000");
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == 0x80000000u);
-
- ToHexNumber(0x718293a4u, Hex);
- HexString = std::string(Hex);
- CHECK(HexString == "718293a4");
- CHECK(ParseHexNumber(HexString, Value));
- CHECK(Value == 0x718293a4u);
-}
-
-TEST_CASE("compactcas.compact.gc")
-{
- ScopedTemporaryDirectory TempDir;
-
- const int kIterationCount = 1000;
-
- std::vector<IoHash> Keys(kIterationCount);
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- CbObjectWriter Cbo;
- Cbo << "id" << i;
- CbObject Obj = Cbo.Save();
-
- IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
- const IoHash Hash = HashBuffer(ObjBuffer);
-
- Cas.InsertChunk(ObjBuffer, Hash);
-
- Keys[i] = Hash;
- }
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- IoBuffer Chunk = Cas.FindChunk(Keys[i]);
-
- CHECK(!!Chunk);
-
- CbObject Value = LoadCompactBinaryObject(Chunk);
-
- CHECK_EQ(Value["id"].AsInt32(), i);
- }
- }
-
- // Validate that we can still read the inserted data after closing
- // the original cas store
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- IoBuffer Chunk = Cas.FindChunk(Keys[i]);
-
- CHECK(!!Chunk);
-
- CbObject Value = LoadCompactBinaryObject(Chunk);
-
- CHECK_EQ(Value["id"].AsInt32(), i);
- }
- }
-}
-
-TEST_CASE("compactcas.compact.totalsize")
-{
- std::random_device rd;
- std::mt19937 g(rd());
-
- // for (uint32_t i = 0; i < 100; ++i)
- {
- ScopedTemporaryDirectory TempDir;
-
- const uint64_t kChunkSize = 1024;
- const int32_t kChunkCount = 16;
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- IoBuffer Chunk = CreateRandomChunk(kChunkSize);
- const IoHash Hash = HashBuffer(Chunk);
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
- }
-
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
- }
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
-
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
- }
-
- // Re-open again, this time we should have a snapshot
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
-
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
- }
- }
-}
-
-TEST_CASE("compactcas.gc.basic")
-{
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
-
- IoBuffer Chunk = CreateRandomChunk(128);
- IoHash ChunkHash = IoHash::HashBuffer(Chunk);
-
- const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
- CHECK(InsertResult.New);
- Cas.Flush();
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
-
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHash));
-}
-
-TEST_CASE("compactcas.gc.removefile")
-{
- ScopedTemporaryDirectory TempDir;
-
- IoBuffer Chunk = CreateRandomChunk(128);
- IoHash ChunkHash = IoHash::HashBuffer(Chunk);
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
-
- const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
- CHECK(InsertResult.New);
- const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash);
- CHECK(!InsertResultDup.New);
- Cas.Flush();
- }
-
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false);
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
-
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHash));
-}
-
-TEST_CASE("compactcas.gc.compact")
-{
- // for (uint32_t i = 0; i < 100; ++i)
- {
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true);
-
- uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(9);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(CreateRandomChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(9);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New);
- CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New);
- CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New);
- CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New);
- CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New);
- CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New);
- CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New);
- CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New);
- CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New);
-
- CHECK(Cas.HaveChunk(ChunkHashes[0]));
- CHECK(Cas.HaveChunk(ChunkHashes[1]));
- CHECK(Cas.HaveChunk(ChunkHashes[2]));
- CHECK(Cas.HaveChunk(ChunkHashes[3]));
- CHECK(Cas.HaveChunk(ChunkHashes[4]));
- CHECK(Cas.HaveChunk(ChunkHashes[5]));
- CHECK(Cas.HaveChunk(ChunkHashes[6]));
- CHECK(Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- // Keep first and last
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
-
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[0]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(Cas.HaveChunk(ChunkHashes[0]));
- CHECK(!Cas.HaveChunk(ChunkHashes[1]));
- CHECK(!Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(!Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(!Cas.HaveChunk(ChunkHashes[6]));
- CHECK(!Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
- Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Keep last
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHashes[0]));
- CHECK(!Cas.HaveChunk(ChunkHashes[1]));
- CHECK(!Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(!Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(!Cas.HaveChunk(ChunkHashes[6]));
- CHECK(!Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
- Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Keep mixed
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[1]);
- KeepChunks.push_back(ChunkHashes[4]);
- KeepChunks.push_back(ChunkHashes[7]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHashes[0]));
- CHECK(Cas.HaveChunk(ChunkHashes[1]));
- CHECK(!Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(!Cas.HaveChunk(ChunkHashes[6]));
- CHECK(Cas.HaveChunk(ChunkHashes[7]));
- CHECK(!Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
-
- Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
- Cas.InsertChunk(Chunks[8], ChunkHashes[8]);
- }
-
- // Keep multiple at end
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[6]);
- KeepChunks.push_back(ChunkHashes[7]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(!Cas.HaveChunk(ChunkHashes[0]));
- CHECK(!Cas.HaveChunk(ChunkHashes[1]));
- CHECK(!Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(!Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(Cas.HaveChunk(ChunkHashes[6]));
- CHECK(Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- }
-
- // Keep every other
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[0]);
- KeepChunks.push_back(ChunkHashes[2]);
- KeepChunks.push_back(ChunkHashes[4]);
- KeepChunks.push_back(ChunkHashes[6]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- CHECK(Cas.HaveChunk(ChunkHashes[0]));
- CHECK(!Cas.HaveChunk(ChunkHashes[1]));
- CHECK(Cas.HaveChunk(ChunkHashes[2]));
- CHECK(!Cas.HaveChunk(ChunkHashes[3]));
- CHECK(Cas.HaveChunk(ChunkHashes[4]));
- CHECK(!Cas.HaveChunk(ChunkHashes[5]));
- CHECK(Cas.HaveChunk(ChunkHashes[6]));
- CHECK(!Cas.HaveChunk(ChunkHashes[7]));
- CHECK(Cas.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Verify that we nicely appended blocks even after all GC operations
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
- }
-}
-
-TEST_CASE("compactcas.gc.deleteblockonopen")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(20);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(CreateRandomChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(20);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- {
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
- }
-
- // GC every other block
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- for (size_t i = 0; i < 20; i += 2)
- {
- KeepChunks.push_back(ChunkHashes[i]);
- }
- GcCtx.AddRetainedCids(KeepChunks);
-
- Cas.Flush();
- Cas.CollectGarbage(GcCtx);
-
- for (size_t i = 0; i < 20; i += 2)
- {
- CHECK(Cas.HaveChunk(ChunkHashes[i]));
- CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
- }
- }
- }
- {
- // Re-open
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 1024, 16, false);
-
- for (size_t i = 0; i < 20; i += 2)
- {
- CHECK(Cas.HaveChunk(ChunkHashes[i]));
- CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
- }
- }
-}
-
-TEST_CASE("compactcas.gc.handleopeniobuffer")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(20);
- for (const uint64_t& Size : ChunkSizes)
- {
- Chunks.push_back(CreateRandomChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(20);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
- }
-
- IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]);
- Cas.Flush();
-
- // GC everything
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- Cas.CollectGarbage(GcCtx);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(!Cas.HaveChunk(ChunkHashes[i]));
- }
-
- CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
-}
-
-TEST_CASE("compactcas.threadedinsert")
-{
- // for (uint32_t i = 0; i < 100; ++i)
- {
- ScopedTemporaryDirectory TempDir;
-
- const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 4096;
- uint64_t ExpectedSize = 0;
-
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
- Chunks.reserve(kChunkCount);
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- while (true)
- {
- IoBuffer Chunk = CreateRandomChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- if (Chunks.contains(Hash))
- {
- continue;
- }
- Chunks[Hash] = Chunk;
- ExpectedSize += Chunk.Size();
- break;
- }
- }
-
- std::atomic<size_t> WorkCompleted = 0;
- WorkerThreadPool ThreadPool(4);
- GcManager Gc;
- CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
- {
- for (const auto& Chunk : Chunks)
- {
- const IoHash& Hash = Chunk.first;
- const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
- ZEN_ASSERT(InsertResult.New);
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
- }
-
- WorkCompleted = 0;
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_LE(ExpectedSize, TotalSize);
- CHECK_GE(ExpectedSize + 32768, TotalSize);
-
- {
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- IoHash Hash = IoHash::HashBuffer(Buffer);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
- }
-
- std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes;
- GcChunkHashes.reserve(Chunks.size());
- for (const auto& Chunk : Chunks)
- {
- GcChunkHashes.insert(Chunk.first);
- }
- {
- WorkCompleted = 0;
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
- NewChunks.reserve(kChunkCount);
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- IoBuffer Chunk = CreateRandomChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunks[Hash] = Chunk;
- }
-
- std::atomic_uint32_t AddedChunkCount;
-
- for (const auto& Chunk : NewChunks)
- {
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Cas.InsertChunk(Chunk.second, Chunk.first);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
- }
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- if (Buffer)
- {
- CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
- }
- WorkCompleted.fetch_add(1);
- });
- }
-
- while (AddedChunkCount.load() < NewChunks.size())
- {
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- if (Cas.HaveChunk(Chunk.first))
- {
- GcChunkHashes.emplace(Chunk.first);
- }
- }
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- GcCtx.AddRetainedCids(KeepHashes);
- Cas.CollectGarbage(GcCtx);
- const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
- }
-
- while (WorkCompleted < NewChunks.size() + Chunks.size())
- {
- Sleep(1);
- }
-
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- if (Cas.HaveChunk(Chunk.first))
- {
- GcChunkHashes.emplace(Chunk.first);
- }
- }
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- GcCtx.AddRetainedCids(KeepHashes);
- Cas.CollectGarbage(GcCtx);
- const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
- }
- {
- WorkCompleted = 0;
- for (const IoHash& ChunkHash : GcChunkHashes)
- {
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
- CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < GcChunkHashes.size())
- {
- Sleep(1);
- }
- }
- }
-}
-
-#endif
-
-void
-compactcas_forcelink()
-{
-}
-
-} // namespace zen