aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-15 13:09:02 +0100
committerDan Engelbrecht <[email protected]>2022-03-31 11:28:32 +0200
commitc7dfd50ff6ead3ffbba99500885cbe9ffe8975a6 (patch)
tree294c5525248199f827ccf5f5020fc8267b4a5a0b
parentWIP (diff)
downloadzen-c7dfd50ff6ead3ffbba99500885cbe9ffe8975a6.tar.xz
zen-c7dfd50ff6ead3ffbba99500885cbe9ffe8975a6.zip
Use blocks to store data in cas stores
-rw-r--r--zenstore/cas.cpp4
-rw-r--r--zenstore/chunkbundler.cpp1242
-rw-r--r--zenstore/chunkbundler.h105
-rw-r--r--zenstore/compactcas.cpp960
-rw-r--r--zenstore/compactcas.h56
5 files changed, 737 insertions, 1630 deletions
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
index a90e45c04..5b13e27fd 100644
--- a/zenstore/cas.cpp
+++ b/zenstore/cas.cpp
@@ -150,8 +150,8 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig)
// Initialize payload storage
m_LargeStrategy.Initialize(IsNewStore);
- m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
- m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
+ m_TinyStrategy.Initialize("tobs", 1L << 30, 16, IsNewStore);
+ m_SmallStrategy.Initialize("sobs", 1L << 30, 4096, IsNewStore);
}
bool
diff --git a/zenstore/chunkbundler.cpp b/zenstore/chunkbundler.cpp
deleted file mode 100644
index 0e81dc8dc..000000000
--- a/zenstore/chunkbundler.cpp
+++ /dev/null
@@ -1,1242 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "chunkbundler.h"
-
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/iobuffer.h>
-#include <zencore/logging.h>
-#include <zencore/testing.h>
-
-#if ZEN_WITH_TESTS
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compress.h>
-# include <zencore/filesystem.h>
-# include <zencore/testutils.h>
-# include <algorithm>
-# include <random>
-#endif
-
-namespace zen {
-
-static uint64_t
-AlignPositon(uint64_t Offset, uint64_t Alignment)
-{
- return (Offset + Alignment - 1) & ~(Alignment - 1);
-}
-
-ChunkBundler::ChunkBundler(std::filesystem::path RootDirectory, ChunkBundlerValidator* Validator)
-: m_Validator(Validator)
-, m_Log(logging::Get("chunkbundler"))
-, m_RootDirectory(RootDirectory)
-{
-}
-
-ChunkBundler::~ChunkBundler()
-{
-}
-
-void
-ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxBlockSize, uint64_t Alignment, bool IsNewStore)
-{
- ZEN_ASSERT(IsPow2(Alignment));
- ZEN_ASSERT(!m_IsInitialized);
- ZEN_ASSERT(MaxBlockSize > 0);
-
- m_ContainerBaseName = ContainerBaseName;
- m_PayloadAlignment = Alignment;
- m_MaxBlockSize = MaxBlockSize;
-
- std::filesystem::path SobsPath = m_RootDirectory / (m_ContainerBaseName + ".ucas");
- std::filesystem::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog");
-
- m_TotalSize = 0;
-
- m_LocationMap.clear();
-
- std::filesystem::path SidxPath = m_RootDirectory / (m_ContainerBaseName + ".uidx");
- if (std::filesystem::exists(SidxPath))
- {
- BasicFile SmallObjectIndex;
- SmallObjectIndex.Open(SidxPath, false);
- uint64_t Size = SmallObjectIndex.FileSize();
- uint64_t EntryCount = Size / sizeof(CompactDiskIndexEntry);
- std::vector<CompactDiskIndexEntry> Entries{EntryCount};
- SmallObjectIndex.Read(Entries.data(), Size, 0);
- for (const auto& Entry : Entries)
- {
- m_LocationMap[Entry.Key] = Entry.Location;
- }
- SmallObjectIndex.Close();
- }
-
- m_OpLog.Open(SlogPath, IsNewStore);
- m_OpLog.Replay([&](const CompactDiskIndexEntry& Record) {
- if (Record.Flags & CompactDiskIndexEntry::kTombstone)
- {
- m_LocationMap.erase(Record.Key);
- }
- else
- {
- m_LocationMap[Record.Key] = Record.Location;
- }
- });
-
- std::unordered_set<uint16_t> ReferencedBlockIndexes;
- for (const auto& Entry : m_LocationMap)
- {
- const auto& Location = Entry.second;
- m_TotalSize.fetch_add(Location.Size);
- ReferencedBlockIndexes.insert(Location.BlockIndex);
- }
-
- uint32_t SmallestBlockSize = 0xffffffffu;
- CreateDirectories(m_RootDirectory / "ucas");
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_RootDirectory / "ucas"))
- {
- if (Entry.is_regular_file())
- {
- if (IsNewStore)
- {
- std::filesystem::remove(Entry.path());
- continue;
- }
- if (Entry.path().extension() == ".ucas")
- {
- try
- {
- std::string FileName = Entry.path().stem().string();
- uint16_t BlockIndex = static_cast<uint16_t>(std::stoi(FileName));
- if (!ReferencedBlockIndexes.contains(BlockIndex))
- {
- // Clear out unused blocks
- std::filesystem::remove(Entry.path());
- continue;
- }
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(Entry.path(), false);
- m_OpenBlocks[BlockIndex] = SmallObjectFile;
- if (SmallObjectFile->FileSize() < SmallestBlockSize)
- {
- m_CurrentBlockIndex = BlockIndex;
- SmallestBlockSize = gsl::narrow<std::uint32_t>(SmallObjectFile->FileSize());
- }
- }
- catch (const std::invalid_argument&)
- {
- // Non-valid file, skip it (or should we remove it?)
- }
- }
- }
- }
- if (m_OpenBlocks.empty())
- {
- std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas");
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(path, true);
- m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
- m_CurrentBlock = SmallObjectFile;
- m_CurrentInsertOffset = 0;
- }
- else
- {
- m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex];
- m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment));
- }
-
- // TODO: should validate integrity of container files here
-
- m_IsInitialized = true;
-}
-
-ChunkBundler::InsertResult
-ChunkBundler::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
-{
- RwLock::ExclusiveLockScope _i(m_InsertLock);
-
- {
- RwLock::SharedLockScope _l(m_LocationMapLock);
- auto KeyIt = m_LocationMap.find(ChunkHash);
-
- if (KeyIt != m_LocationMap.end())
- {
- return InsertResult{.New = false};
- }
- }
-
- // New entry
-
- uint64_t CurrentBlockSize = m_CurrentBlock.lock()->FileSize();
- if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize)
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- uint16_t NewBlockIndex = m_CurrentBlockIndex + 1;
- while (m_OpenBlocks.contains(NewBlockIndex))
- {
- NewBlockIndex++;
- if (NewBlockIndex == m_CurrentBlockIndex)
- {
- throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
- }
- }
- m_CurrentBlockIndex = NewBlockIndex;
- std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas");
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(path, true);
- m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
- m_CurrentBlock = SmallObjectFile;
- m_CurrentInsertOffset = 0;
- }
- const uint32_t InsertOffset = m_CurrentInsertOffset;
- m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset);
- m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment));
-
- const CompactDiskLocation Location{m_CurrentBlockIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)};
- CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
-
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- m_LocationMap[ChunkHash] = Location;
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize));
- m_OpLog.Append(IndexEntry);
-
- return InsertResult{.New = true};
-}
-
-ChunkBundler::InsertResult
-ChunkBundler::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
-{
- return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
-}
-
-IoBuffer
-ChunkBundler::FindChunk(const IoHash& ChunkHash)
-{
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
- {
- const CompactDiskLocation& Location = KeyIt->second;
-
- if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end())
- {
- return IoBufferBuilder::MakeFromFileHandle(BlockIt->second->Handle(), Location.Offset, Location.Size);
- }
- }
-
- // Not found
-
- return IoBuffer();
-}
-
-bool
-ChunkBundler::HaveChunk(const IoHash& ChunkHash)
-{
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- return m_LocationMap.contains(ChunkHash);
-}
-
-void
-ChunkBundler::FilterChunks(CasChunkSet& InOutChunks)
-{
- // This implementation is good enough for relatively small
- // chunk sets (in terms of chunk identifiers), but would
- // benefit from a better implementation which removes
- // items incrementally for large sets, especially when
- // we're likely to already have a large proportion of the
- // chunks in the set
-
- InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
-}
-
-void
-ChunkBundler::Flush()
-{
- RwLock::ExclusiveLockScope _l(m_InsertLock);
- m_OpLog.Flush();
- m_CurrentBlock.lock()->Flush();
-}
-
-void
-ChunkBundler::Scrub(ScrubContext& Ctx)
-{
- const uint64_t WindowSize = 4 * 1024 * 1024;
-
- std::vector<CompactDiskIndexEntry> BigChunks;
- std::vector<CompactDiskIndexEntry> BadChunks;
-
- // We do a read sweep through the payloads file and validate
- // any entries that are contained within each segment, with
- // the assumption that most entries will be checked in this
- // pass. An alternative strategy would be to use memory mapping.
-
- {
- IoBuffer ReadBuffer{WindowSize};
- void* BufferBase = ReadBuffer.MutableData();
-
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- for (const auto& Block : m_OpenBlocks)
- {
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
- auto& SmallObjectFile = *Block.second;
- const uint64_t FileSize = SmallObjectFile.FileSize();
-
- do
- {
- const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
- SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart);
-
- for (auto& Entry : m_LocationMap)
- {
- const uint64_t EntryOffset = Entry.second.Offset;
-
- if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
- {
- const uint64_t EntryEnd = EntryOffset + Entry.second.Size;
-
- if (EntryEnd >= WindowEnd)
- {
- BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
-
- continue;
- }
-
- if (m_Validator &&
- !m_Validator->ValidateChunk(IoBuffer(IoBuffer::Wrap,
- reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart,
- Entry.second.Size),
- Entry.first))
- {
- // Hash mismatch
- BadChunks.push_back({.Key = Entry.first, .Location = Entry.second});
- }
- }
- }
-
- WindowStart += WindowSize;
- WindowEnd += WindowSize;
- } while (WindowStart < FileSize);
- }
-
- // TODO: Figure out API for this
- // Deal with large chunks
-#if 0
- for (const auto& _ : BigChunks)
- {
- auto& ___ = *m_OpenBlocks[Entry.Location.BlockIndex];
- /* IoHashStream Hasher;
- m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) {
- Hasher.Append(Data, Size);
- });
- IoHash ComputedHash = Hasher.GetHash();
-
- if (Entry.Key != ComputedHash)
- {
- BadChunks.push_back(Entry);
- }
- */
- }
-#endif // 0
- }
-
- if (BadChunks.empty())
- {
- return;
- }
-
- ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName);
-
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<IoHash> BadChunkHashes;
-
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- for (const CompactDiskIndexEntry& Entry : BadChunks)
- {
- BadChunkHashes.push_back(Entry.Key);
- m_OpLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CompactDiskIndexEntry::kTombstone});
- m_LocationMap.erase(Entry.Key);
- }
-
- // Let whomever it concerns know about the bad chunks. This could
- // be used to invalidate higher level data structures more efficiently
- // than a full validation pass might be able to do
-
- Ctx.ReportBadCasChunks(BadChunkHashes);
-}
-
-void
-ChunkBundler::CollectGarbage(GcContext& GcCtx)
-{
- namespace fs = std::filesystem;
-
- // It collects all the blocks that we want to delete chunks from. For each such
- // block we keep a list of chunks to retain.
- //
- // It will first remove any chunks that are flushed from the m_LocationMap.
- //
- // It then checks to see if we want to purge any chunks that are in the currently
- // active block. If so, we break off the current block and start on a new block,
- // otherwise we just let the active block be.
- //
- // Next it will iterate over all blocks that we want to remove chunks from.
- // If the block is empty after removal of chunks we mark the block as pending
- // delete - we want to delete it as soon as there are no IoBuffers using the
- // block file.
- //
- // If the block is non-empty we write out the chunks we want to keep to a new
- // block file (creating new block files as needed).
- //
- // We update the index as we complete each new block file. This makes it possible
- // to break the GC if we want to limit time for execution.
- //
- // GC can fairly parallell to regular operation - it will block while figuring
- // out which chunks to remove and what blocks to rewrite but the actual
- // reading and writing of data to new block files does not block regular operation.
- //
- // While moving blocks it will do a blocking operation and update the m_LocationMap
- // after each new block is written and it will also block when figuring out the
- // path to the next new block.
-
- ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);
-
- std::unordered_map<uint64_t, size_t> BlockIndexToKeepChunksMap;
- std::vector<std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher>> KeepChunks;
- std::vector<IoHash> DeletedChunks;
- std::unordered_set<uint16_t> BlocksToReWrite;
- {
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
-
- m_OpLog.Flush();
- m_CurrentBlock.lock()->Flush();
-
- BlocksToReWrite.reserve(m_OpenBlocks.size());
-
- if (m_LocationMap.empty())
- {
- ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_RootDirectory / m_ContainerBaseName);
- return;
- }
-
- const uint64_t TotalChunkCount = m_LocationMap.size();
- uint64_t TotalSize = m_TotalSize.load();
-
- std::vector<IoHash> TotalChunkHashes;
- TotalChunkHashes.reserve(m_LocationMap.size());
- for (const auto& Entry : m_LocationMap)
- {
- TotalChunkHashes.push_back(Entry.first);
- if (BlockIndexToKeepChunksMap.contains(Entry.second.BlockIndex))
- {
- continue;
- }
- BlockIndexToKeepChunksMap[Entry.second.BlockIndex] = KeepChunks.size();
- KeepChunks.resize(KeepChunks.size() + 1);
- }
-
- const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
-
- uint64_t NewTotalSize = 0;
- GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- if (Keep)
- {
- auto KeyIt = m_LocationMap.find(ChunkHash);
- const auto& ChunkLocation = KeyIt->second;
- auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[ChunkLocation.BlockIndex]];
- ChunkMap[ChunkHash] = ChunkLocation;
- NewTotalSize += ChunkLocation.Size;
- }
- else
- {
- DeletedChunks.push_back(ChunkHash);
- }
- });
-
- if (!PerformDelete)
- {
- ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
- m_RootDirectory / m_ContainerBaseName,
- DeletedChunks.size(),
- NiceBytes(TotalSize - NewTotalSize),
- TotalChunkCount,
- NiceBytes(TotalSize));
- return;
- }
-
- for (const auto& ChunkHash : DeletedChunks)
- {
- auto KeyIt = m_LocationMap.find(ChunkHash);
- const auto& ChunkLocation = KeyIt->second;
- BlocksToReWrite.insert(ChunkLocation.BlockIndex);
- m_OpLog.Append({.Key = ChunkHash, .Location = ChunkLocation, .Flags = CompactDiskIndexEntry::kTombstone});
- m_LocationMap.erase(ChunkHash);
- m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.Size));
- }
-
- if (BlocksToReWrite.contains(m_CurrentBlockIndex))
- {
- uint16_t NewBlockIndex = m_CurrentBlockIndex + 1;
- while (m_OpenBlocks.contains(NewBlockIndex))
- {
- NewBlockIndex++;
- if (NewBlockIndex == m_CurrentBlockIndex)
- {
- ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
- m_ContainerBaseName,
- std::numeric_limits<uint16_t>::max() + 1);
- return;
- }
- }
- m_CurrentBlockIndex = NewBlockIndex;
- std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas");
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(path, true);
- m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
- m_CurrentBlock = SmallObjectFile;
- m_CurrentInsertOffset = 0;
- }
- }
-
- // Move all chunks in blocks that have chunks removed to new blocks
-
- std::shared_ptr<BasicFile> NewBlockFile;
- uint64_t WriteOffset = {};
- uint16_t NewBlockIndex = {};
- std::unordered_map<IoHash, CompactDiskLocation> MovedBlocks;
-
- for (auto BlockIndex : BlocksToReWrite)
- {
- auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[BlockIndex]];
- if (ChunkMap.empty())
- {
- // The block has no references to it, it should be removed as soon as no references is held on the file
- // TODO: We currently don't know if someone is holding a IoBuffer for this block at this point!
-
- // std::filesystem::path BlockPath = m_RootDirectory / "ucas" / (std::to_string(BlockIndex) + ".ucas");
- // RwLock::ExclusiveLockScope _i(m_InsertLock);
- // auto BlockFile = m_OpenBlocks[BlockIndex];
- // m_OpenBlocks.erase(BlockIndex);
- // BlockFile->Close();
- // fs::remove(BlockPath);
- continue;
- }
-
- std::shared_ptr<BasicFile> BlockFile;
- {
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- BlockFile = m_OpenBlocks[BlockIndex];
- }
-
- {
- std::vector<uint8_t> Chunk;
- for (auto& Entry : ChunkMap)
- {
- const CompactDiskLocation& ChunkLocation = Entry.second;
- Chunk.resize(ChunkLocation.Size);
- BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
-
- if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
- {
- {
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- if (NewBlockFile)
- {
- m_OpenBlocks[NewBlockIndex] = NewBlockFile;
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
- for (const auto& MovedEntry : MovedBlocks)
- {
- m_LocationMap[MovedEntry.first] = MovedEntry.second;
- m_OpLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
- }
- }
- NewBlockIndex = m_CurrentBlockIndex + 1;
- while (m_OpenBlocks.contains(NewBlockIndex))
- {
- NewBlockIndex++;
- if (NewBlockIndex == m_CurrentBlockIndex)
- {
- ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
- m_ContainerBaseName,
- std::numeric_limits<uint16_t>::max() + 1);
- return;
- }
- }
- m_OpenBlocks[NewBlockIndex] = std::shared_ptr<BasicFile>(); // Make sure nobody steals this slot
- }
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_RootDirectory, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message());
- return;
- }
-
- if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space
- {
- ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}",
- m_RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize * m_MaxBlockSize,
- NiceBytes(Space.Free));
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- m_OpenBlocks.erase(NewBlockIndex);
- return;
- }
-
- std::filesystem::path NewBlockPath = m_RootDirectory / "ucas" / (std::to_string(NewBlockIndex) + ".ucas");
- NewBlockFile = std::make_shared<BasicFile>();
- NewBlockFile->Open(NewBlockPath, true);
- MovedBlocks.clear();
- WriteOffset = 0;
- }
-
- NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
- CompactDiskLocation NewChunkLocation(NewBlockIndex,
- gsl::narrow<uint32_t>(WriteOffset),
- gsl::narrow<uint32_t>(Chunk.size()));
- Entry.second = {.BlockIndex = NewBlockIndex,
- .Offset = gsl::narrow<uint32_t>(WriteOffset),
- .Size = gsl::narrow<uint32_t>(Chunk.size())};
- MovedBlocks[Entry.first] = Entry.second;
- WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment);
- }
- Chunk.clear();
-
- // Remap moved chunks to the new block file
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- if (NewBlockFile)
- {
- m_OpenBlocks[NewBlockIndex] = NewBlockFile;
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
- for (const auto& MovedEntry : MovedBlocks)
- {
- m_LocationMap[MovedEntry.first] = MovedEntry.second;
- m_OpLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
- }
- }
- }
- }
-
- GcCtx.DeletedCas(DeletedChunks);
-
- ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_RootDirectory / m_ContainerBaseName, DeletedChunks.size());
-
- MakeIndexSnapshot();
-}
-
-void
-ChunkBundler::MakeIndexSnapshot()
-{
- ZEN_INFO("writing index snapshot for '{}'", m_RootDirectory / m_ContainerBaseName);
-
- namespace fs = std::filesystem;
-
- fs::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog");
- fs::path SidxPath = m_RootDirectory / (m_ContainerBaseName + ".uidx");
- fs::path STmplogPath = m_RootDirectory / (m_ContainerBaseName + ".tmp.ulog");
- fs::path STmpSidxPath = m_RootDirectory / (m_ContainerBaseName + ".tmp.uidx");
- fs::path SRecoveredlogPath = m_RootDirectory / (m_ContainerBaseName + ".recover.ulog");
-
- // Move log and index away, we keep them if something goes wrong, any new chunks will be added to the new log
- {
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- m_OpLog.Close();
-
- if (fs::exists(STmplogPath))
- {
- fs::remove(STmplogPath);
- }
- if (fs::exists(STmpSidxPath))
- {
- fs::remove(STmpSidxPath);
- }
-
- fs::rename(SlogPath, STmplogPath);
- if (fs::exists(SidxPath))
- {
- fs::rename(SidxPath, STmpSidxPath);
- }
-
- // Open an new log
- m_OpLog.Open(SlogPath, true);
- }
-
- try
- {
- // Write the current state of the location map to a new index state
- std::vector<CompactDiskIndexEntry> Entries;
-
- {
- RwLock::SharedLockScope _l(m_LocationMapLock);
- Entries.resize(m_LocationMap.size());
-
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_LocationMap)
- {
- CompactDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = Entry.second;
- }
- }
-
- BasicFile SmallObjectIndex;
- SmallObjectIndex.Open(SidxPath, true);
- SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CompactDiskIndexEntry), 0);
- SmallObjectIndex.Close();
- }
- catch (std::exception& Err)
- {
- ZEN_ERROR("snapshot FAILED, reason '{}'", Err.what());
-
- // Reconstruct the log from old log and any added log entries
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- if (fs::exists(STmplogPath))
- {
- std::vector<CompactDiskIndexEntry> Records;
- Records.reserve(m_LocationMap.size());
- {
- TCasLogFile<CompactDiskIndexEntry> OldOpLog;
- OldOpLog.Open(STmplogPath, false);
- OldOpLog.Replay([&](const CompactDiskIndexEntry& Record) { Records.push_back(Record); });
- }
- {
- m_OpLog.Replay([&](const CompactDiskIndexEntry& Record) { Records.push_back(Record); });
- }
-
- TCasLogFile<CompactDiskIndexEntry> RecoveredOpLog;
- RecoveredOpLog.Open(SRecoveredlogPath, true);
- for (const auto& Record : Records)
- {
- RecoveredOpLog.Append(Record);
- }
- RecoveredOpLog.Close();
-
- fs::remove(SlogPath);
- fs::rename(SRecoveredlogPath, SlogPath);
- fs::remove(STmplogPath);
- }
-
- if (fs::exists(SidxPath))
- {
- fs::remove(SidxPath);
- }
-
- // Restore any previous snapshot
- if (fs::exists(STmpSidxPath))
- {
- fs::remove(SidxPath);
- fs::rename(STmpSidxPath, SidxPath);
- }
- }
- if (fs::exists(STmpSidxPath))
- {
- fs::remove(STmpSidxPath);
- }
-}
-
-GcStorageSize
-ChunkBundler::StorageSize() const
-{
- return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)};
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-
-namespace {
- static IoBuffer CreateChunk(uint64_t Size)
- {
- static std::random_device rd;
- static std::mt19937 g(rd());
-
- std::vector<uint8_t> Values;
- Values.resize(Size);
- for (size_t Idx = 0; Idx < Size; ++Idx)
- {
- Values[Idx] = static_cast<uint8_t>(Idx);
- }
- std::shuffle(Values.begin(), Values.end(), g);
-
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
- }
-} // namespace
-
-#endif
-
-TEST_CASE("chunkbundler.reopen")
-{
- ScopedTemporaryDirectory TempDir;
-
- CreateDirectories(TempDir.Path());
-
- const int kIterationCount = 1000;
-
- std::vector<IoHash> Keys(kIterationCount);
-
- {
- ChunkBundler Store(TempDir.Path(), nullptr);
- Store.Initialize("test", 65536, 16, true);
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- CbObjectWriter Cbo;
- Cbo << "id" << i;
- CbObject Obj = Cbo.Save();
-
- IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
- const IoHash Hash = HashBuffer(ObjBuffer);
-
- Store.InsertChunk(ObjBuffer, Hash);
-
- Keys[i] = Hash;
- }
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- IoBuffer Chunk = Store.FindChunk(Keys[i]);
-
- CHECK(!!Chunk);
-
- CbObject Value = LoadCompactBinaryObject(Chunk);
-
- CHECK_EQ(Value["id"].AsInt32(), i);
- }
- }
-
- // Validate that we can still read the inserted data after closing
- // the original store
-
- {
- ChunkBundler Store(TempDir.Path(), nullptr);
- Store.Initialize("test", 65536, 16, false);
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- IoBuffer Chunk = Store.FindChunk(Keys[i]);
-
- CHECK(!!Chunk);
-
- CbObject Value = LoadCompactBinaryObject(Chunk);
-
- CHECK_EQ(Value["id"].AsInt32(), i);
- }
-
- GcContext Ctx;
- Store.CollectGarbage(Ctx);
- }
-}
-
-TEST_CASE("chunkbundler.totalsize")
-{
- std::random_device rd;
- std::mt19937 g(rd());
-
- const auto CreateChunk = [&](uint64_t Size) -> IoBuffer {
- const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t));
- std::vector<uint32_t> Values;
- Values.resize(Count);
- for (size_t Idx = 0; Idx < Count; ++Idx)
- {
- Values[Idx] = static_cast<uint32_t>(Idx);
- }
- std::shuffle(Values.begin(), Values.end(), g);
-
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t));
- };
-
- ScopedTemporaryDirectory TempDir;
-
- CreateDirectories(TempDir.Path());
-
- const uint64_t kChunkSize = 1024;
- const int32_t kChunkCount = 16;
-
- {
- ChunkBundler Store(TempDir.Path(), nullptr);
- Store.Initialize("test", 65536, 16, true);
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- IoBuffer Chunk = CreateChunk(kChunkSize);
- const IoHash Hash = HashBuffer(Chunk);
- auto InsertResult = Store.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
- }
-
- const uint64_t TotalSize = Store.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
- }
-
- {
- ChunkBundler Store(TempDir.Path(), nullptr);
- Store.Initialize("test", 65536, 16, false);
-
- const uint64_t TotalSize = Store.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
- }
-}
-
-TEST_CASE("chunkbundler.gc.basic")
-{
- ScopedTemporaryDirectory TempDir;
-
- ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
-
- ChunkBundlerStore.Initialize("cb", 65536, 1 << 4, true);
-
- IoBuffer Chunk = CreateChunk(128);
- IoHash ChunkHash = IoHash::HashBuffer(Chunk);
-
- const auto InsertResult = ChunkBundlerStore.InsertChunk(Chunk, ChunkHash);
-
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
-
- ChunkBundlerStore.CollectGarbage(GcCtx);
-
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHash));
-}
-
-TEST_CASE("chunkbundler.gc.compact")
-{
- ScopedTemporaryDirectory TempDir;
-
- ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
- ChunkBundlerStore.Initialize("cb", 2048, 1 << 4, true);
-
- uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(9);
- for (const auto& Size : ChunkSizes)
- {
- Chunks.push_back(CreateChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(9);
- for (const auto& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]).New);
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]).New);
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]).New);
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]).New);
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]).New);
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]).New);
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]).New);
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]).New);
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[8], ChunkHashes[8]).New);
-
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[1]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[2]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[3]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[5]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8]));
-
- auto InitialSize = ChunkBundlerStore.StorageSize().DiskSize;
-
- // Keep first and last
- {
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
-
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[0]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
-
- ChunkBundlerStore.CollectGarbage(GcCtx);
-
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8])));
- }
-
- ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]);
- ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]);
- ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]);
- ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]);
- ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]);
- ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]);
- ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]);
-
- // Keep last
- {
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
-
- ChunkBundlerStore.CollectGarbage(GcCtx);
-
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8])));
-
- ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]);
- ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]);
- ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]);
- ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]);
- ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]);
- ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]);
- ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Keep mixed
- {
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[1]);
- KeepChunks.push_back(ChunkHashes[4]);
- KeepChunks.push_back(ChunkHashes[7]);
- GcCtx.ContributeCas(KeepChunks);
-
- ChunkBundlerStore.CollectGarbage(GcCtx);
-
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[1]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7])));
-
- ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]);
- ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]);
- ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]);
- ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]);
- ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]);
- ChunkBundlerStore.InsertChunk(Chunks[8], ChunkHashes[8]);
- }
-
- // Keep multiple at end
- {
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[6]);
- KeepChunks.push_back(ChunkHashes[7]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
-
- ChunkBundlerStore.CollectGarbage(GcCtx);
-
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8])));
-
- ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]);
- ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]);
- ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]);
- ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]);
- ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]);
- ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]);
- }
-
- // Keep every other
- {
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- KeepChunks.push_back(ChunkHashes[0]);
- KeepChunks.push_back(ChunkHashes[2]);
- KeepChunks.push_back(ChunkHashes[4]);
- KeepChunks.push_back(ChunkHashes[6]);
- KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
-
- ChunkBundlerStore.CollectGarbage(GcCtx);
-
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[2]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7]));
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8]));
-
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8])));
-
- ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]);
- ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]);
- ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]);
- ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]);
- }
-
- // Verify that we nicely appended blocks even after all GC operations
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[3] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[3])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[5] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[5])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8])));
-
- auto FinalSize = ChunkBundlerStore.StorageSize().DiskSize;
- CHECK(InitialSize == FinalSize);
-}
-
-TEST_CASE("chunkbundler.gc.deleteblockonopen")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(20);
- for (const auto& Size : ChunkSizes)
- {
- Chunks.push_back(CreateChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(20);
- for (const auto& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- {
- ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
- ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, true);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[i], ChunkHashes[i]).New);
- }
-
- // GC every other block
- {
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- std::vector<IoHash> KeepChunks;
- for (size_t i = 0; i < 20; i += 2)
- {
- KeepChunks.push_back(ChunkHashes[i]);
- }
- GcCtx.ContributeCas(KeepChunks);
-
- ChunkBundlerStore.CollectGarbage(GcCtx);
-
- for (size_t i = 0; i < 20; i += 2)
- {
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[i]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i + 1]));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[i])));
- }
- }
- }
- {
- // Re-open
- ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
- ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, false);
- for (size_t i = 0; i < 20; i += 2)
- {
- CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[i]));
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i + 1]));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[i])));
- }
- }
-}
-
-TEST_CASE("chunkbundler.gc.handleopeniobuffer")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(20);
- for (const auto& Size : ChunkSizes)
- {
- Chunks.push_back(CreateChunk(Size));
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(20);
- for (const auto& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
- ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, true);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(ChunkBundlerStore.InsertChunk(Chunks[i], ChunkHashes[i]).New);
- }
-
- auto RetainChunk = ChunkBundlerStore.FindChunk(ChunkHashes[5]);
-
- // GC everything
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- ChunkBundlerStore.CollectGarbage(GcCtx);
-
- for (size_t i = 0; i < 20; i++)
- {
- CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i]));
- }
-
- CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
-}
-
-void
-chunkbundler_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/zenstore/chunkbundler.h b/zenstore/chunkbundler.h
deleted file mode 100644
index d8913aec6..000000000
--- a/zenstore/chunkbundler.h
+++ /dev/null
@@ -1,105 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/zencore.h>
-
-#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
-#include <zenstore/caslog.h>
-#include <zenstore/gc.h>
-
-#include <atomic>
-#include <unordered_map>
-
-namespace spdlog {
-class logger;
-}
-
-namespace zen {
-
-class ChunkBundlerValidator
-{
-public:
- virtual bool ValidateChunk(IoBuffer Buffer, IoHash Key) = 0;
-};
-
-#pragma pack(push)
-#pragma pack(1)
-
-struct CompactDiskLocation
-{
- uint16_t BlockIndex;
- uint32_t Offset;
- uint32_t Size;
-};
-
-struct CompactDiskIndexEntry
-{
- static const uint8_t kTombstone = 0x01;
-
- IoHash Key;
- CompactDiskLocation Location;
- ZenContentType ContentType = ZenContentType::kUnknownContentType;
- uint8_t Flags = 0;
-};
-
-#pragma pack(pop)
-
-static_assert(sizeof(CompactDiskIndexEntry) == 32);
-
-class ChunkBundler final
-{
-public:
- ChunkBundler(std::filesystem::path RootDirectory, ChunkBundlerValidator* Validator);
- ~ChunkBundler();
-
- struct InsertResult
- {
- bool New = false;
- };
-
- void Initialize(const std::string_view ContainerBaseName, uint64_t MaxBlockSize, uint64_t Alignment, bool IsNewStore);
- InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
- InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
- IoBuffer FindChunk(const IoHash& ChunkHash);
- bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(CasChunkSet& InOutChunks);
- void Flush();
- void Scrub(ScrubContext& Ctx);
- void CollectGarbage(GcContext& GcCtx);
- GcStorageSize StorageSize() const;
-
-private:
- spdlog::logger& Log() { return m_Log; }
-
- ChunkBundlerValidator* m_Validator;
- spdlog::logger& m_Log;
- uint64_t m_PayloadAlignment = 1 << 4;
- uint64_t m_MaxBlockSize = 1L << 30; // 1 Gb
- bool m_IsInitialized = false;
- std::filesystem::path RootDirectory;
- TCasLogFile<CompactDiskIndexEntry> m_OpLog;
- std::filesystem::path m_RootDirectory;
- std::string m_ContainerBaseName;
-
- RwLock m_LocationMapLock;
- std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher> m_LocationMap;
- std::unordered_map<uint16_t, std::shared_ptr<BasicFile>> m_OpenBlocks;
- uint16_t m_CurrentBlockIndex = 0;
-
- RwLock m_InsertLock; // used to serialize inserts
- std::weak_ptr<BasicFile> m_CurrentBlock;
- std::atomic_uint32_t m_CurrentInsertOffset{};
-
- std::atomic_uint64_t m_CurrentIndexOffset{};
- std::atomic_uint64_t m_TotalSize{};
-
- void MakeIndexSnapshot();
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-void chunkbundler_forcelink();
-
-} // namespace zen
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 91ee232fe..18dcc7fc8 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -2,27 +2,18 @@
#include <zenstore/cas.h>
-#include "compactcas.h"
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
-#include <zencore/memory.h>
-#include <zencore/string.h>
#include <zencore/testing.h>
-#include <zencore/testutils.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
-
#include <zenstore/gc.h>
-
-#include <filesystem>
-#include <functional>
#include <gsl/gsl-lite.hpp>
+#include "compactcas.h"
#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/testutils.h>
+# include <zenstore/cidstore.h>
# include <algorithm>
# include <random>
#endif
@@ -31,11 +22,16 @@
namespace zen {
-static uint64_t
-AlignPositon(uint64_t Offset, uint64_t Alignment)
-{
- return (Offset + Alignment - 1) & ~(Alignment - 1);
-}
+namespace {
+ uint64_t AlignPositon(uint64_t Offset, uint64_t Alignment) { return (Offset + Alignment - 1) & ~(Alignment - 1); }
+
+ std::filesystem::path BuildUcasPath(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ const uint16_t BlockIndex)
+ {
+ return RootDirectory / (std::string(ContainerBaseName) + "." + (std::to_string(BlockIndex) + ".ucas"));
+ }
+} // namespace
CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
: GcStorage(Gc)
@@ -49,13 +45,15 @@ CasContainerStrategy::~CasContainerStrategy()
}
void
-CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore)
+CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore)
{
ZEN_ASSERT(IsPow2(Alignment));
ZEN_ASSERT(!m_IsInitialized);
+ ZEN_ASSERT(MaxBlockSize > 0);
m_ContainerBaseName = ContainerBaseName;
m_PayloadAlignment = Alignment;
+ m_MaxBlockSize = MaxBlockSize;
OpenContainer(IsNewStore);
@@ -79,11 +77,32 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// New entry
- const uint64_t InsertOffset = m_CurrentInsertOffset;
- m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset);
- m_CurrentInsertOffset = AlignPositon(m_CurrentInsertOffset + ChunkSize, m_PayloadAlignment);
+ uint64_t CurrentBlockSize = m_CurrentBlock.lock()->FileSize();
+ if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize)
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ uint16_t NewBlockIndex = m_CurrentBlockIndex + 1;
+ while (m_OpenBlocks.contains(NewBlockIndex))
+ {
+ NewBlockIndex++;
+ if (NewBlockIndex == m_CurrentBlockIndex)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
+ }
+ }
+ m_CurrentBlockIndex = NewBlockIndex;
+ std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(path, true);
+ m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
+ m_CurrentBlock = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
+ }
+ const uint32_t InsertOffset = m_CurrentInsertOffset;
+ m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset);
+ m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment));
- const CasDiskLocation Location{InsertOffset, ChunkSize};
+ const CasDiskLocation Location{m_CurrentBlockIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)};
CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
RwLock::ExclusiveLockScope __(m_LocationMapLock);
@@ -109,7 +128,10 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
const CasDiskLocation& Location = KeyIt->second;
- return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize());
+ if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end())
+ {
+ return IoBufferBuilder::MakeFromFileHandle(BlockIt->second->Handle(), Location.Offset, Location.Size);
+ }
}
// Not found
@@ -121,13 +143,7 @@ bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
RwLock::SharedLockScope _(m_LocationMapLock);
-
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
- {
- return true;
- }
-
- return false;
+ return m_LocationMap.contains(ChunkHash);
}
void
@@ -146,18 +162,15 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
CasContainerStrategy::Flush()
{
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope _(m_InsertLock);
m_CasLog.Flush();
- m_SmallObjectFile.Flush();
+ m_CurrentBlock.lock()->Flush();
}
void
CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
- const uint64_t WindowSize = 4 * 1024 * 1024;
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
- const uint64_t FileSize = m_SmallObjectFile.FileSize();
+ const uint64_t WindowSize = 4 * 1024 * 1024;
std::vector<CasDiskIndexEntry> BigChunks;
std::vector<CasDiskIndexEntry> BadChunks;
@@ -171,51 +184,61 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
IoBuffer ReadBuffer{WindowSize};
void* BufferBase = ReadBuffer.MutableData();
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time?
+ RwLock::SharedLockScope __(m_LocationMapLock);
- do
+ for (const auto& Block : m_OpenBlocks)
{
- const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
- m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart);
+ uint64_t WindowStart = 0;
+ uint64_t WindowEnd = WindowSize;
+ auto& SmallObjectFile = *Block.second;
+ const uint64_t FileSize = SmallObjectFile.FileSize();
- for (auto& Entry : m_LocationMap)
+ do
{
- const uint64_t EntryOffset = Entry.second.GetOffset();
+ const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
+ SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart);
- if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
+ for (auto& Entry : m_LocationMap)
{
- const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize();
+ const uint64_t EntryOffset = Entry.second.Offset;
- if (EntryEnd >= WindowEnd)
+ if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
{
- BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ const uint64_t EntryEnd = EntryOffset + Entry.second.Size;
- continue;
- }
+ if (EntryEnd >= WindowEnd)
+ {
+ BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
- const IoHash ComputedHash =
- IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart,
- Entry.second.GetSize());
+ continue;
+ }
- if (Entry.first != ComputedHash)
- {
- // Hash mismatch
+ const IoHash ComputedHash =
+ IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart,
+ Entry.second.Size);
- BadChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ if (Entry.first != ComputedHash)
+ {
+ // Hash mismatch
+
+ BadChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ }
}
}
- }
- WindowStart += WindowSize;
- WindowEnd += WindowSize;
- } while (WindowStart < FileSize);
+ WindowStart += WindowSize;
+ WindowEnd += WindowSize;
+ } while (WindowStart < FileSize);
+ }
// Deal with large chunks
for (const CasDiskIndexEntry& Entry : BigChunks)
{
IoHashStream Hasher;
- m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) {
+ auto& SmallObjectFile = *m_OpenBlocks[Entry.Location.BlockIndex];
+ SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) {
Hasher.Append(Data, Size);
});
IoHash ComputedHash = Hasher.GetHash();
@@ -258,33 +281,48 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
namespace fs = std::filesystem;
- // Garbage collection will first remove any chunks that are flushed from the index.
- // It then tries to compact the existing small object file, it does this by
- // collecting all chunks that should be kept and sort them in position order.
- // It then steps from chunk to chunk and checks if there is space to move the last
- // chunk before the current chunk. It repeats this until it can't fit the last chunk
- // or the last chunk is the current chunk.
- // After this it check to see if there is space to move the current chunk closer to
- // the preceeding chunk (or beginning of file if there is no preceeding chunk).
- // It updates the new write position for any new chunks and rewrites the cas log
- // to match the new content of the store.
+ // It collects all the blocks that we want to delete chunks from. For each such
+ // block we keep a list of chunks to retain.
//
- // It currently grabs a full lock during the GC operation but the compacting is
- // done gradually and can be stopped after each chunk if the GC operation needs to
- // be time limited. This will leave holes in the small object file that will not
- // be reclaimed unless a GC operation is executed again, but the state of the
- // cas store is intact.
+ // It will first remove any chunks that are flushed from the m_LocationMap.
//
- // It is also possible to more fine-grained locking of GC operation when moving
- // blocks but that requires more work and additional checking if new blocks are
- // added betwen each move of a block.
+ // It then checks to see if we want to purge any chunks that are in the currently
+ // active block. If so, we break off the current block and start on a new block,
+ // otherwise we just let the active block be.
+ //
+ // Next it will iterate over all blocks that we want to remove chunks from.
+ // If the block is empty after removal of chunks we mark the block as pending
+ // delete - we want to delete it as soon as there are no IoBuffers using the
+ // block file.
+ //
+ // If the block is non-empty we write out the chunks we want to keep to a new
+ // block file (creating new block files as needed).
+ //
+ // We update the index as we complete each new block file. This makes it possible
+ // to break the GC if we want to limit time for execution.
+ //
+ // GC can fairly parallell to regular operation - it will block while figuring
+ // out which chunks to remove and what blocks to rewrite but the actual
+ // reading and writing of data to new block files does not block regular operation.
+ //
+ // While moving blocks it will do a blocking operation and update the m_LocationMap
+ // after each new block is written and it will also block when figuring out the
+ // path to the next new block.
+
ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ std::unordered_map<uint64_t, size_t> BlockIndexToKeepChunksMap;
+ std::vector<std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher>> KeepChunks;
+ std::vector<IoHash> DeletedChunks;
+ std::unordered_set<uint16_t> BlocksToReWrite;
{
RwLock::ExclusiveLockScope _i(m_InsertLock);
RwLock::ExclusiveLockScope _l(m_LocationMapLock);
- Flush();
+ m_CasLog.Flush();
+ m_CurrentBlock.lock()->Flush();
+
+ BlocksToReWrite.reserve(m_OpenBlocks.size());
if (m_LocationMap.empty())
{
@@ -297,21 +335,28 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
std::vector<IoHash> TotalChunkHashes;
TotalChunkHashes.reserve(m_LocationMap.size());
- for (auto& Entry : m_LocationMap)
+ for (const auto& Entry : m_LocationMap)
{
TotalChunkHashes.push_back(Entry.first);
+ if (BlockIndexToKeepChunksMap.contains(Entry.second.BlockIndex))
+ {
+ continue;
+ }
+ BlockIndexToKeepChunksMap[Entry.second.BlockIndex] = KeepChunks.size();
+ KeepChunks.resize(KeepChunks.size() + 1);
}
- std::vector<IoHash> DeletedChunks;
- std::vector<IoHash> ChunkHashes; // Same sort order as ChunkLocations
- ChunkHashes.reserve(m_LocationMap.size());
-
- const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ uint64_t NewTotalSize = 0;
GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
if (Keep)
{
- ChunkHashes.push_back(ChunkHash);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
+ const auto& ChunkLocation = KeyIt->second;
+ auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[ChunkLocation.BlockIndex]];
+ ChunkMap[ChunkHash] = ChunkLocation;
+ NewTotalSize += ChunkLocation.Size;
}
else
{
@@ -319,162 +364,172 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
}
});
- if (ChunkHashes.size() == TotalChunkCount)
- {
- ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete",
- TotalChunkCount,
- NiceBytes(TotalSize),
- m_Config.RootDirectory / m_ContainerBaseName);
- return;
- }
-
- const uint64_t ChunkCount = ChunkHashes.size();
-
- std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
- auto LhsKeyIt = m_LocationMap.find(Lhs);
- auto RhsKeyIt = m_LocationMap.find(Rhs);
- return LhsKeyIt->second.GetOffset() < RhsKeyIt->second.GetOffset();
- });
-
- uint64_t NewTotalSize = 0;
- std::vector<CasDiskLocation> ChunkLocations;
- ChunkLocations.reserve(ChunkHashes.size());
- for (auto Entry : ChunkHashes)
- {
- auto KeyIt = m_LocationMap.find(Entry);
- const auto& ChunkLocation = KeyIt->second;
- ChunkLocations.push_back(ChunkLocation);
- NewTotalSize += ChunkLocation.GetSize();
- }
-
- if (!CollectSmallObjects)
+ if (!PerformDelete)
{
ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
m_Config.RootDirectory / m_ContainerBaseName,
- TotalChunkCount - ChunkCount,
+ DeletedChunks.size(),
NiceBytes(TotalSize - NewTotalSize),
TotalChunkCount,
NiceBytes(TotalSize));
return;
}
- for (auto ChunkHash : DeletedChunks)
+ for (const auto& ChunkHash : DeletedChunks)
{
- auto KeyIt = m_LocationMap.find(ChunkHash);
- const auto& ChunkLocation = KeyIt->second;
- uint64_t NextChunkOffset = ChunkLocation.GetOffset() + ChunkLocation.GetSize();
+ auto KeyIt = m_LocationMap.find(ChunkHash);
+ const auto& ChunkLocation = KeyIt->second;
+ BlocksToReWrite.insert(ChunkLocation.BlockIndex);
m_CasLog.Append({.Key = ChunkHash, .Location = ChunkLocation, .Flags = CasDiskIndexEntry::kTombstone});
m_LocationMap.erase(ChunkHash);
- if (m_CurrentInsertOffset == NextChunkOffset)
+ m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.Size));
+ }
+
+ if (BlocksToReWrite.contains(m_CurrentBlockIndex))
+ {
+ uint16_t NewBlockIndex = m_CurrentBlockIndex + 1;
+ while (m_OpenBlocks.contains(NewBlockIndex))
{
- m_CurrentInsertOffset = ChunkLocation.GetOffset();
+ NewBlockIndex++;
+ if (NewBlockIndex == m_CurrentBlockIndex)
+ {
+ ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
+ m_ContainerBaseName,
+ std::numeric_limits<uint16_t>::max() + 1);
+ return;
+ }
}
- m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.GetSize()));
+ m_CurrentBlockIndex = NewBlockIndex;
+ std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(path, true);
+ m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
+ m_CurrentBlock = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
}
+ }
- // We can break here if we only want to remove items without compacting of space
+ // Move all chunks in blocks that have chunks removed to new blocks
- std::vector<IoHash> MovedChunks;
+ std::shared_ptr<BasicFile> NewBlockFile;
+ uint64_t WriteOffset = {};
+ uint16_t NewBlockIndex = {};
+ std::unordered_map<IoHash, CasDiskLocation> MovedBlocks;
- uint64_t WriteOffset{};
- uint64_t ChunkIndex{};
- while (ChunkIndex < ChunkHashes.size())
+ for (auto BlockIndex : BlocksToReWrite)
+ {
+ auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[BlockIndex]];
+ if (ChunkMap.empty())
{
- IoHash ChunkHash = ChunkHashes[ChunkIndex];
- const auto& ChunkLocation = ChunkLocations[ChunkIndex];
-
- uint64_t NextChunkOffset = AlignPositon(ChunkLocation.GetOffset() + ChunkLocation.GetSize(), m_PayloadAlignment);
+ // The block has no references to it, it should be removed as soon as no references is held on the file
+ // TODO: We currently don't know if someone is holding a IoBuffer for this block at this point!
+
+ // std::filesystem::path BlockPath = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, BlockIndex);
+ // RwLock::ExclusiveLockScope _i(m_InsertLock);
+ // auto BlockFile = m_OpenBlocks[BlockIndex];
+ // m_OpenBlocks.erase(BlockIndex);
+ // BlockFile->Close();
+ // fs::remove(BlockPath);
+ continue;
+ }
- uint64_t FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset;
+ std::shared_ptr<BasicFile> BlockFile;
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ BlockFile = m_OpenBlocks[BlockIndex];
+ }
- // TODO: We could keep some wiggle room here, only try to find the last keep block if there is a reasonable amount of space free
- while (FreeChunkSize >= m_PayloadAlignment)
+ {
+ std::vector<uint8_t> Chunk;
+ for (auto& Entry : ChunkMap)
{
- // We should move as many keep chunk at the end as we can possibly fit
- uint64_t LastKeepChunkIndex = ChunkHashes.size() - 1;
- if (LastKeepChunkIndex == ChunkIndex)
- {
- break;
- }
+ const CasDiskLocation& ChunkLocation = Entry.second;
+ Chunk.resize(ChunkLocation.Size);
+ BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
- IoHash LastChunkHash = ChunkHashes[LastKeepChunkIndex];
- const auto& LastChunkLocation = ChunkLocations[LastKeepChunkIndex];
- if (LastChunkLocation.GetSize() > FreeChunkSize)
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
{
- break;
- }
-
- // Move the last chunk to our write location
- std::vector<uint8_t> Chunk;
- Chunk.resize(LastChunkLocation.GetSize());
- m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), LastChunkLocation.GetOffset());
- CasDiskLocation NewChunkLocation(WriteOffset, Chunk.size());
- m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset());
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ if (NewBlockFile)
+ {
+ m_OpenBlocks[NewBlockIndex] = NewBlockFile;
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
+ for (const auto& MovedEntry : MovedBlocks)
+ {
+ m_LocationMap[MovedEntry.first] = MovedEntry.second;
+ m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
+ }
+ }
+ NewBlockIndex = m_CurrentBlockIndex + 1;
+ while (m_OpenBlocks.contains(NewBlockIndex))
+ {
+ NewBlockIndex++;
+ if (NewBlockIndex == m_CurrentBlockIndex)
+ {
+ ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
+ m_ContainerBaseName,
+ std::numeric_limits<uint16_t>::max() + 1);
+ return;
+ }
+ }
+ m_OpenBlocks[NewBlockIndex] = std::shared_ptr<BasicFile>(); // Make sure nobody steals this slot
+ }
- CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation};
- m_CasLog.Append(IndexEntry);
- m_LocationMap[LastChunkHash] = NewChunkLocation;
- ChunkHashes.pop_back();
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message());
+ return;
+ }
- WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment);
- FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset;
- MovedChunks.push_back(LastChunkHash);
+ if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space
+ {
+ ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ m_MaxBlockSize * m_MaxBlockSize,
+ NiceBytes(Space.Free));
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ m_OpenBlocks.erase(NewBlockIndex);
+ return;
+ }
- uint64_t LastChunkNextChunkOffset = AlignPositon(LastChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment);
- if (m_CurrentInsertOffset == LastChunkNextChunkOffset)
- {
- m_CurrentInsertOffset = LastChunkLocation.GetOffset();
+ std::filesystem::path NewBlockPath = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex);
+ NewBlockFile = std::make_shared<BasicFile>();
+ NewBlockFile->Open(NewBlockPath, true);
+ MovedBlocks.clear();
+ WriteOffset = 0;
}
- }
- // TODO: We could keep some wiggle room here, don't move chunk if we only move it a very small amount
- if (FreeChunkSize > m_PayloadAlignment)
- {
- std::vector<uint8_t> Chunk;
- Chunk.resize(ChunkLocation.GetSize());
- m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), ChunkLocation.GetOffset());
- CasDiskLocation NewChunkLocation(WriteOffset, Chunk.size());
- m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset());
-
- CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation};
- m_CasLog.Append(IndexEntry);
- m_LocationMap[ChunkHash] = NewChunkLocation;
-
- MovedChunks.push_back(ChunkHash);
- WriteOffset = AlignPositon(NewChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment);
- }
- else
- {
- WriteOffset = NextChunkOffset;
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ CasDiskLocation NewChunkLocation(NewBlockIndex, gsl::narrow<uint32_t>(WriteOffset), gsl::narrow<uint32_t>(Chunk.size()));
+ Entry.second = {.BlockIndex = NewBlockIndex,
+ .Offset = gsl::narrow<uint32_t>(WriteOffset),
+ .Size = gsl::narrow<uint32_t>(Chunk.size())};
+ MovedBlocks[Entry.first] = Entry.second;
+ WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment);
}
+ Chunk.clear();
- // Update insert location if this is the last chunk in the file
- if (m_CurrentInsertOffset == NextChunkOffset)
+ // Remap moved chunks to the new block file
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ if (NewBlockFile)
{
- m_CurrentInsertOffset = WriteOffset;
+ m_OpenBlocks[NewBlockIndex] = NewBlockFile;
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
+ for (const auto& MovedEntry : MovedBlocks)
+ {
+ m_LocationMap[MovedEntry.first] = MovedEntry.second;
+ m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
+ }
}
-
- // We can break here if we want to do incremental GC
-
- ChunkIndex++;
- }
-
- if (ChunkCount == 0)
- {
- m_CurrentInsertOffset = 0;
}
+ }
- GcCtx.DeletedCas(DeletedChunks);
+ GcCtx.DeletedCas(DeletedChunks);
- uint64_t CurrentSize = m_SmallObjectFile.FileSize();
- ZEN_INFO("garbage collection complete '{}', space {} to {}, moved {} and delete {} chunks",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(CurrentSize),
- NiceBytes(m_CurrentInsertOffset),
- MovedChunks.size(),
- DeletedChunks.size());
- // TODO: Should we truncate the file or just keep the size of the file and reuse the space?
- }
+ ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_Config.RootDirectory / m_ContainerBaseName, DeletedChunks.size());
MakeIndexSnapshot();
}
@@ -592,16 +647,11 @@ CasContainerStrategy::MakeIndexSnapshot()
void
CasContainerStrategy::OpenContainer(bool IsNewStore)
{
- std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas");
- std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
-
- m_SmallObjectFile.Open(SobsPath, IsNewStore);
- m_CasLog.Open(SlogPath, IsNewStore);
+ // TODO: Pick up old Cas store format so we can use it in our store
- // TODO: should validate integrity of container files here
+ std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
- m_CurrentInsertOffset = 0;
- m_TotalSize = 0;
+ m_TotalSize = 0;
m_LocationMap.clear();
@@ -621,6 +671,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
SmallObjectIndex.Close();
}
+ m_CasLog.Open(SlogPath, IsNewStore);
m_CasLog.Replay([&](const CasDiskIndexEntry& Record) {
if (Record.Flags & CasDiskIndexEntry::kTombstone)
{
@@ -632,28 +683,104 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
});
- uint64_t MaxFileOffset = 0;
+ std::unordered_set<uint16_t> ReferencedBlockIndexes;
for (const auto& Entry : m_LocationMap)
{
const auto& Location = Entry.second;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Location.GetOffset() + Location.GetSize());
- m_TotalSize.fetch_add(Location.GetSize());
+ m_TotalSize.fetch_add(Location.Size);
+ ReferencedBlockIndexes.insert(Location.BlockIndex);
+ }
+
+ uint32_t SmallestBlockSize = 0xffffffffu;
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.RootDirectory))
+ {
+ if (Entry.is_regular_file())
+ {
+ // TODO: Clean up naming/storage structure so we don't have to do this complicated parsing to find our ucas files
+ if (Entry.path().extension() != ".ucas")
+ {
+ continue;
+ }
+ std::string FileName = Entry.path().stem().string();
+ if (!FileName.starts_with(m_ContainerBaseName))
+ {
+ continue;
+ }
+ if (IsNewStore)
+ {
+ std::filesystem::remove(Entry.path());
+ continue;
+ }
+ try
+ {
+ uint16_t BlockIndex = static_cast<uint16_t>(std::stoi(FileName.substr(m_ContainerBaseName.length() + 1)));
+ if (!ReferencedBlockIndexes.contains(BlockIndex))
+ {
+ // Clear out unused blocks
+ std::filesystem::remove(Entry.path());
+ continue;
+ }
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(Entry.path(), false);
+ m_OpenBlocks[BlockIndex] = SmallObjectFile;
+ if (SmallObjectFile->FileSize() < SmallestBlockSize)
+ {
+ m_CurrentBlockIndex = BlockIndex;
+ SmallestBlockSize = gsl::narrow<std::uint32_t>(SmallObjectFile->FileSize());
+ }
+ }
+ catch (const std::invalid_argument&)
+ {
+ // Non-valid file, skip it (or should we remove it?)
+ }
+ }
+ }
+ if (m_OpenBlocks.empty())
+ {
+ std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(path, true);
+ m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
+ m_CurrentBlock = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
+ }
+ else
+ {
+ m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex];
+ m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment));
}
- m_CurrentInsertOffset = AlignPositon(MaxFileOffset, m_PayloadAlignment);
+ // TODO: should validate integrity of container files here
}
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
+namespace {
+ static IoBuffer CreateChunk(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
+ }
+} // namespace
+
TEST_CASE("cas.compact.gc")
{
ScopedTemporaryDirectory TempDir;
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path();
-
CreateDirectories(CasConfig.RootDirectory);
const int kIterationCount = 1000;
@@ -663,7 +790,7 @@ TEST_CASE("cas.compact.gc")
{
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, true);
+ Cas.Initialize("test", 65536, 16, true);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -697,7 +824,7 @@ TEST_CASE("cas.compact.gc")
{
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, false);
+ Cas.Initialize("test", 65536, 16, false);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -720,19 +847,6 @@ TEST_CASE("cas.compact.totalsize")
std::random_device rd;
std::mt19937 g(rd());
- const auto CreateChunk = [&](uint64_t Size) -> IoBuffer {
- const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t));
- std::vector<uint32_t> Values;
- Values.resize(Count);
- for (size_t Idx = 0; Idx < Count; ++Idx)
- {
- Values[Idx] = static_cast<uint32_t>(Idx);
- }
- std::shuffle(Values.begin(), Values.end(), g);
-
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t));
- };
-
ScopedTemporaryDirectory TempDir;
CasStoreConfiguration CasConfig;
@@ -746,7 +860,7 @@ TEST_CASE("cas.compact.totalsize")
{
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, true);
+ Cas.Initialize("test", 65536, 16, true);
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
@@ -763,13 +877,385 @@ TEST_CASE("cas.compact.totalsize")
{
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, false);
+ Cas.Initialize("test", 65536, 16, false);
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
}
}
+TEST_CASE("cas.gc.basic")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("cb", 65536, 1 << 4, true);
+
+ IoBuffer Chunk = CreateChunk(128);
+ IoHash ChunkHash = IoHash::HashBuffer(Chunk);
+
+ const auto InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHash));
+}
+
+TEST_CASE("cas.gc.compact")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("cb", 2048, 1 << 4, true);
+
+ uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(9);
+ for (const auto& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(9);
+ for (const auto& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New);
+ CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New);
+ CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New);
+ CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New);
+ CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New);
+ CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New);
+ CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New);
+ CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New);
+ CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ auto InitialSize = Cas.StorageSize().DiskSize;
+
+ // Keep first and last
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ }
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+
+ // Keep last
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Keep mixed
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[1]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+
+ Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[8], ChunkHashes[8]);
+ }
+
+ // Keep multiple at end
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ }
+
+ // Keep every other
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[2]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Verify that we nicely appended blocks even after all GC operations
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ auto FinalSize = Cas.StorageSize().DiskSize;
+ CHECK(InitialSize == FinalSize);
+}
+
+TEST_CASE("cas.gc.deleteblockonopen")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (const auto& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const auto& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ // GC every other block
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+ }
+ {
+ // Re-open
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, false);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+}
+
+TEST_CASE("cas.gc.handleopeniobuffer")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (const auto& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const auto& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ auto RetainChunk = Cas.FindChunk(ChunkHashes[5]);
+
+ // GC everything
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ Cas.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(!Cas.HaveChunk(ChunkHashes[i]));
+ }
+
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
+}
#endif
void
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 204482534..35f118f34 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -3,21 +3,10 @@
#pragma once
#include <zencore/zencore.h>
-
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/string.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
-#if ZEN_PLATFORM_WINDOWS
-# include <zencore/windows.h>
-#endif
-
#include <atomic>
#include <unordered_map>
@@ -34,34 +23,9 @@ namespace zen {
struct CasDiskLocation
{
- CasDiskLocation(uint64_t InOffset, uint64_t InSize)
- {
- ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
- ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
-
- memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
- memcpy(&m_Size[0], &InSize, sizeof m_Size);
- }
-
- CasDiskLocation() = default;
-
- inline uint64_t GetOffset() const
- {
- uint64_t Offset = 0;
- memcpy(&Offset, &m_Offset, sizeof m_Offset);
- return Offset;
- }
-
- inline uint64_t GetSize() const
- {
- uint64_t Size = 0;
- memcpy(&Size, &m_Size, sizeof m_Size);
- return Size;
- }
-
-private:
- uint8_t m_Offset[5];
- uint8_t m_Size[5];
+ uint16_t BlockIndex;
+ uint32_t Offset;
+ uint32_t Size;
};
struct CasDiskIndexEntry
@@ -96,7 +60,7 @@ struct CasContainerStrategy final : public GcStorage
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(CasChunkSet& InOutChunks);
- void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore);
+ void Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore);
void Flush();
void Scrub(ScrubContext& Ctx);
virtual void CollectGarbage(GcContext& GcCtx) override;
@@ -109,16 +73,20 @@ private:
const CasStoreConfiguration& m_Config;
spdlog::logger& m_Log;
uint64_t m_PayloadAlignment = 1 << 4;
+ uint64_t m_MaxBlockSize = 1L << 30; // 1 Gb
bool m_IsInitialized = false;
- BasicFile m_SmallObjectFile;
TCasLogFile<CasDiskIndexEntry> m_CasLog;
std::string m_ContainerBaseName;
RwLock m_LocationMapLock;
std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> m_LocationMap;
- RwLock m_InsertLock; // used to serialize inserts
- std::atomic_uint64_t m_CurrentInsertOffset{};
- std::atomic_uint64_t m_TotalSize{};
+
+ RwLock m_InsertLock; // used to serialize inserts
+ std::unordered_map<uint16_t, std::shared_ptr<BasicFile>> m_OpenBlocks;
+ std::weak_ptr<BasicFile> m_CurrentBlock;
+ uint16_t m_CurrentBlockIndex = 0;
+ std::atomic_uint32_t m_CurrentInsertOffset{};
+ std::atomic_uint64_t m_TotalSize{};
void MakeIndexSnapshot();
};