diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-14 14:00:23 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:28:31 +0200 |
| commit | 29b610eae28d53fac844936116453a52970a2397 (patch) | |
| tree | b732160900bcba7e81f31b8f7434e3d8c976251c | |
| parent | WIP (diff) | |
| download | zen-29b610eae28d53fac844936116453a52970a2397.tar.xz zen-29b610eae28d53fac844936116453a52970a2397.zip | |
WIP
| -rw-r--r-- | zenstore/chunkbundler.cpp | 1008 | ||||
| -rw-r--r-- | zenstore/chunkbundler.h | 126 | ||||
| -rw-r--r-- | zenstore/zenstore.cpp | 2 |
3 files changed, 1136 insertions, 0 deletions
diff --git a/zenstore/chunkbundler.cpp b/zenstore/chunkbundler.cpp new file mode 100644 index 000000000..ca5066e33 --- /dev/null +++ b/zenstore/chunkbundler.cpp @@ -0,0 +1,1008 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "chunkbundler.h" + +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging.h> +#include <zencore/testing.h> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compress.h> +# include <zencore/filesystem.h> +# include <zencore/testutils.h> +# include <algorithm> +# include <random> +#endif + +namespace zen { + +static uint64_t +AlignPositon(uint64_t Offset, uint64_t Alignment) +{ + return (Offset + Alignment - 1) & ~(Alignment - 1); +} + +template<typename T> +class reverse +{ +private: + T& iterable_; + +public: + explicit reverse(T& iterable) : iterable_{iterable} {} + auto begin() const { return std::rbegin(iterable_); } + auto end() const { return std::rend(iterable_); } +}; + +ChunkBundler::ChunkBundler(std::filesystem::path RootDirectory, ChunkBundlerValidator* Validator) +: m_Validator(Validator) +, m_Log(logging::Get("chunkbundler")) +, m_RootDirectory(RootDirectory) +{ +} + +ChunkBundler::~ChunkBundler() +{ +} + +void +ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) +{ + ZEN_ASSERT(IsPow2(Alignment)); + ZEN_ASSERT(!m_IsInitialized); + + m_ContainerBaseName = ContainerBaseName; + m_PayloadAlignment = Alignment; + + std::filesystem::path SobsPath = m_RootDirectory / (m_ContainerBaseName + ".ucas"); + std::filesystem::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog"); + + m_SmallObjectFile.Open(SobsPath, IsNewStore); + m_OpLog.Open(SlogPath, IsNewStore); + + // TODO: should validate integrity of container files here + + m_CurrentInsertOffset = 0; + m_TotalSize = 0; + + m_LocationMap.clear(); + + std::filesystem::path SidxPath = m_RootDirectory / (m_ContainerBaseName + ".uidx"); + if (std::filesystem::exists(SidxPath)) + { + BasicFile SmallObjectIndex; + SmallObjectIndex.Open(SidxPath, false); + uint64_t Size = SmallObjectIndex.FileSize(); + uint64_t EntryCount = Size / sizeof(CompactDiskIndexEntry); + std::vector<CompactDiskIndexEntry> Entries{EntryCount}; + SmallObjectIndex.Read(Entries.data(), Size, 0); + for (const auto& Entry : Entries) + { + m_LocationMap[Entry.Key] = Entry.Location; + } + SmallObjectIndex.Close(); + } + + m_OpLog.Replay([&](const CompactDiskIndexEntry& Record) { + if (Record.Flags & CompactDiskIndexEntry::kTombstone) + { + m_LocationMap.erase(Record.Key); + } + else + { + m_LocationMap[Record.Key] = Record.Location; + } + }); + + uint64_t MaxFileOffset = 0; + for (const auto& Entry : m_LocationMap) + { + const auto& Location = Entry.second; + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Location.GetOffset() + Location.GetSize()); + m_TotalSize.fetch_add(Location.GetSize()); + } + + m_CurrentInsertOffset = AlignPositon(MaxFileOffset, m_PayloadAlignment); + + m_IsInitialized = true; +} + +ChunkBundler::InsertResult +ChunkBundler::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) +{ + RwLock::ExclusiveLockScope _i(m_InsertLock); + + { + RwLock::SharedLockScope _l(m_LocationMapLock); + auto KeyIt = m_LocationMap.find(ChunkHash); + + if (KeyIt != m_LocationMap.end()) + { + return InsertResult{.New = false}; + } + } + + // New entry + + const uint64_t InsertOffset = m_CurrentInsertOffset; + m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset); + m_CurrentInsertOffset = AlignPositon(m_CurrentInsertOffset + ChunkSize, m_PayloadAlignment); + + const CompactDiskLocation Location{InsertOffset, ChunkSize}; + CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; + + RwLock::ExclusiveLockScope __(m_LocationMapLock); + m_LocationMap[ChunkHash] = Location; + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize)); + m_OpLog.Append(IndexEntry); + + return InsertResult{.New = true}; +} + +ChunkBundler::InsertResult +ChunkBundler::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) +{ + return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); +} + +IoBuffer +ChunkBundler::FindChunk(const IoHash& ChunkHash) +{ + RwLock::SharedLockScope _(m_LocationMapLock); + + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) + { + const CompactDiskLocation& Location = KeyIt->second; + + return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); + } + + // Not found + + return IoBuffer(); +} + +bool +ChunkBundler::HaveChunk(const IoHash& ChunkHash) +{ + RwLock::SharedLockScope _(m_LocationMapLock); + + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) + { + return true; + } + + return false; +} + +void +ChunkBundler::FilterChunks(CasChunkSet& InOutChunks) +{ + // This implementation is good enough for relatively small + // chunk sets (in terms of chunk identifiers), but would + // benefit from a better implementation which removes + // items incrementally for large sets, especially when + // we're likely to already have a large proportion of the + // chunks in the set + + InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); +} + +void +ChunkBundler::Flush() +{ + m_OpLog.Flush(); + m_SmallObjectFile.Flush(); +} + +void +ChunkBundler::Scrub(ScrubContext& Ctx) +{ + const uint64_t WindowSize = 4 * 1024 * 1024; + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; + const uint64_t FileSize = m_SmallObjectFile.FileSize(); + + std::vector<CompactDiskIndexEntry> BigChunks; + std::vector<CompactDiskIndexEntry> BadChunks; + + // We do a read sweep through the payloads file and validate + // any entries that are contained within each segment, with + // the assumption that most entries will be checked in this + // pass. An alternative strategy would be to use memory mapping. + + { + IoBuffer ReadBuffer{WindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + RwLock::SharedLockScope _(m_LocationMapLock); + + do + { + const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); + m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); + + for (auto& Entry : m_LocationMap) + { + const uint64_t EntryOffset = Entry.second.GetOffset(); + + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + { + const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); + + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + + continue; + } + + if (m_Validator && !m_Validator->ValidateChunk( + IoBuffer(IoBuffer::Wrap, + reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart, + Entry.second.GetSize()), + Entry.first)) + { + // Hash mismatch + BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + } + } + } + + WindowStart += WindowSize; + WindowEnd += WindowSize; + } while (WindowStart < FileSize); +#if 0 + // TODO: Figure out API for this + // Deal with large chunks + for (const auto& Entry : BigChunks) + { + IoHashStream Hasher; + m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { + Hasher.Append(Data, Size); + }); + IoHash ComputedHash = Hasher.GetHash(); + + if (Entry.Key != ComputedHash) + { + BadChunks.push_back(Entry); + } + } +#endif + } + + if (BadChunks.empty()) + { + return; + } + + ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); + + // Deal with bad chunks by removing them from our lookup map + + std::vector<IoHash> BadChunkHashes; + + RwLock::ExclusiveLockScope _(m_LocationMapLock); + for (const CompactDiskIndexEntry& Entry : BadChunks) + { + BadChunkHashes.push_back(Entry.Key); + m_OpLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CompactDiskIndexEntry::kTombstone}); + m_LocationMap.erase(Entry.Key); + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + + Ctx.ReportBadCasChunks(BadChunkHashes); +} + +void +ChunkBundler::CollectGarbage(GcContext& GcCtx) +{ + namespace fs = std::filesystem; + + // Garbage collection will first remove any chunks that are flushed from the index. + // It then tries to compact the existing small object file, it does this by + // collecting all chunks that should be kept and sort them in position order. + // It then steps from chunk to chunk and checks if there is space to move the last + // chunk before the current chunk. It repeats this until it can't fit the last chunk + // or the last chunk is the current chunk. + // After this it check to see if there is space to move the current chunk closer to + // the preceeding chunk (or beginning of file if there is no preceeding chunk). + // It updates the new write position for any new chunks and rewrites the cas log + // to match the new content of the store. + // + // It currently grabs a full lock during the GC operation but the compacting is + // done gradually and can be stopped after each chunk if the GC operation needs to + // be time limited. This will leave holes in the small object file that will not + // be reclaimed unless a GC operation is executed again, but the state of the + // cas store is intact. + // + // It is also possible to more fine-grained locking of GC operation when moving + // blocks but that requires more work and additional checking if new blocks are + // added betwen each move of a block. + ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); + + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + + Flush(); + + if (m_LocationMap.empty()) + { + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_RootDirectory / m_ContainerBaseName); + return; + } + + const uint64_t TotalChunkCount = m_LocationMap.size(); + uint64_t TotalSize = m_TotalSize.load(); + + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(m_LocationMap.size()); + for (auto& Entry : m_LocationMap) + { + TotalChunkHashes.push_back(Entry.first); + } + + std::vector<IoHash> DeletedChunks; + std::vector<IoHash> ChunkHashes; // Same sort order as ChunkLocations + ChunkHashes.reserve(m_LocationMap.size()); + + const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + + GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + if (Keep) + { + ChunkHashes.push_back(ChunkHash); + } + else + { + DeletedChunks.push_back(ChunkHash); + } + }); + + if (ChunkHashes.size() == TotalChunkCount) + { + ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete", + TotalChunkCount, + NiceBytes(TotalSize), + m_RootDirectory / m_ContainerBaseName); + return; + } + + const uint64_t ChunkCount = ChunkHashes.size(); + + std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { + auto LhsKeyIt = m_LocationMap.find(Lhs); + auto RhsKeyIt = m_LocationMap.find(Rhs); + return LhsKeyIt->second.GetOffset() < RhsKeyIt->second.GetOffset(); + }); + + uint64_t NewTotalSize = 0; + std::vector<CompactDiskLocation> ChunkLocations; + ChunkLocations.reserve(ChunkHashes.size()); + for (auto Entry : ChunkHashes) + { + auto KeyIt = m_LocationMap.find(Entry); + const auto& ChunkLocation = KeyIt->second; + ChunkLocations.push_back(ChunkLocation); + NewTotalSize += ChunkLocation.GetSize(); + } + + if (!CollectSmallObjects) + { + ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_RootDirectory / m_ContainerBaseName, + TotalChunkCount - ChunkCount, + NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); + return; + } + + for (auto ChunkHash : reverse(DeletedChunks)) + { + auto KeyIt = m_LocationMap.find(ChunkHash); + const auto& ChunkLocation = KeyIt->second; + uint64_t NextChunkOffset = ChunkLocation.GetOffset() + ChunkLocation.GetSize(); + m_OpLog.Append({.Key = ChunkHash, .Location = ChunkLocation, .Flags = CompactDiskIndexEntry::kTombstone}); + m_LocationMap.erase(ChunkHash); + if (m_CurrentInsertOffset == NextChunkOffset) + { + m_CurrentInsertOffset = ChunkLocation.GetOffset(); + } + m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.GetSize())); + } + + // We can break here if we only want to remove items without compacting of space + + std::vector<IoHash> MovedChunks; + + uint64_t WriteOffset{}; + uint64_t ChunkIndex{}; + while (ChunkIndex < ChunkHashes.size()) + { + IoHash ChunkHash = ChunkHashes[ChunkIndex]; + const auto& ChunkLocation = ChunkLocations[ChunkIndex]; + + uint64_t NextChunkOffset = AlignPositon(ChunkLocation.GetOffset() + ChunkLocation.GetSize(), m_PayloadAlignment); + + uint64_t FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset; + + // TODO: We could keep some wiggle room here, only try to find the last keep block if there is a reasonable amount of space free + while (FreeChunkSize >= m_PayloadAlignment) + { + // We should move as many keep chunk at the end as we can possibly fit + uint64_t LastKeepChunkIndex = ChunkHashes.size() - 1; + if (LastKeepChunkIndex == ChunkIndex) + { + break; + } + + IoHash LastChunkHash = ChunkHashes[LastKeepChunkIndex]; + const auto& LastChunkLocation = ChunkLocations[LastKeepChunkIndex]; + if (LastChunkLocation.GetSize() > FreeChunkSize) + { + break; + } + + // Move the last chunk to our write location + std::vector<uint8_t> Chunk; + Chunk.resize(LastChunkLocation.GetSize()); + m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), LastChunkLocation.GetOffset()); + CompactDiskLocation NewChunkLocation(WriteOffset, Chunk.size()); + m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset()); + + CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation}; + m_OpLog.Append(IndexEntry); + m_LocationMap[LastChunkHash] = NewChunkLocation; + ChunkHashes.pop_back(); + + WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment); + FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset; + MovedChunks.push_back(LastChunkHash); + + uint64_t LastChunkNextChunkOffset = AlignPositon(LastChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment); + if (m_CurrentInsertOffset == LastChunkNextChunkOffset) + { + m_CurrentInsertOffset = LastChunkLocation.GetOffset(); + } + } + + // TODO: We could keep some wiggle room here, don't move chunk if we only move it a very small amount + if (FreeChunkSize > m_PayloadAlignment) + { + std::vector<uint8_t> Chunk; + Chunk.resize(ChunkLocation.GetSize()); + m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), ChunkLocation.GetOffset()); + CompactDiskLocation NewChunkLocation(WriteOffset, Chunk.size()); + m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset()); + + CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation}; + m_OpLog.Append(IndexEntry); + m_LocationMap[ChunkHash] = NewChunkLocation; + + MovedChunks.push_back(ChunkHash); + WriteOffset = AlignPositon(NewChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment); + } + else + { + WriteOffset = NextChunkOffset; + } + + // Update insert location if this is the last chunk in the file + if (m_CurrentInsertOffset == NextChunkOffset) + { + m_CurrentInsertOffset = WriteOffset; + } + + // We can break here if we want to do incremental GC + + ChunkIndex++; + } + + if (ChunkCount == 0) + { + m_CurrentInsertOffset = 0; + } + + GcCtx.DeletedCas(DeletedChunks); + + uint64_t CurrentSize = m_SmallObjectFile.FileSize(); + ZEN_INFO("garbage collection complete '{}', space {} to {}, moved {} and delete {} chunks", + m_RootDirectory / m_ContainerBaseName, + NiceBytes(CurrentSize), + NiceBytes(m_CurrentInsertOffset), + MovedChunks.size(), + DeletedChunks.size()); + // TODO: Should we truncate the file or just keep the size of the file and reuse the space? + } + + MakeIndexSnapshot(); +} + +void +ChunkBundler::MakeIndexSnapshot() +{ + ZEN_INFO("writing index snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); + + namespace fs = std::filesystem; + + fs::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog"); + fs::path SidxPath = m_RootDirectory / (m_ContainerBaseName + ".uidx"); + fs::path STmplogPath = m_RootDirectory / (m_ContainerBaseName + ".tmp.ulog"); + fs::path STmpSidxPath = m_RootDirectory / (m_ContainerBaseName + ".tmp.uidx"); + fs::path SRecoveredlogPath = m_RootDirectory / (m_ContainerBaseName + ".recover.ulog"); + + // Move log and index away, we keep them if something goes wrong, any new chunks will be added to the new log + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + m_OpLog.Close(); + + if (fs::exists(STmplogPath)) + { + fs::remove(STmplogPath); + } + if (fs::exists(STmpSidxPath)) + { + fs::remove(STmpSidxPath); + } + + fs::rename(SlogPath, STmplogPath); + if (fs::exists(SidxPath)) + { + fs::rename(SidxPath, STmpSidxPath); + } + + // Open an new log + m_OpLog.Open(SlogPath, true); + } + + try + { + // Write the current state of the location map to a new index state + std::vector<CompactDiskIndexEntry> Entries; + + { + RwLock::SharedLockScope _l(m_LocationMapLock); + Entries.resize(m_LocationMap.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_LocationMap) + { + CompactDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second; + } + } + + BasicFile SmallObjectIndex; + SmallObjectIndex.Open(SidxPath, true); + SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CompactDiskIndexEntry), 0); + SmallObjectIndex.Close(); + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason '{}'", Err.what()); + + // Reconstruct the log from old log and any added log entries + RwLock::ExclusiveLockScope _(m_LocationMapLock); + if (fs::exists(STmplogPath)) + { + std::vector<CompactDiskIndexEntry> Records; + Records.reserve(m_LocationMap.size()); + { + TCasLogFile<CompactDiskIndexEntry> OldOpLog; + OldOpLog.Open(STmplogPath, false); + OldOpLog.Replay([&](const CompactDiskIndexEntry& Record) { Records.push_back(Record); }); + } + { + m_OpLog.Replay([&](const CompactDiskIndexEntry& Record) { Records.push_back(Record); }); + } + + TCasLogFile<CompactDiskIndexEntry> RecoveredOpLog; + RecoveredOpLog.Open(SRecoveredlogPath, true); + for (const auto& Record : Records) + { + RecoveredOpLog.Append(Record); + } + RecoveredOpLog.Close(); + + fs::remove(SlogPath); + fs::rename(SRecoveredlogPath, SlogPath); + fs::remove(STmplogPath); + } + + if (fs::exists(SidxPath)) + { + fs::remove(SidxPath); + } + + // Restore any previous snapshot + if (fs::exists(STmpSidxPath)) + { + fs::remove(SidxPath); + fs::rename(STmpSidxPath, SidxPath); + } + } + if (fs::exists(STmpSidxPath)) + { + fs::remove(STmpSidxPath); + } +} + +GcStorageSize +ChunkBundler::StorageSize() const +{ + return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +namespace { + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace + +#endif + +TEST_CASE("chunkbundler.reopen") +{ + ScopedTemporaryDirectory TempDir; + + CreateDirectories(TempDir.Path()); + + const int kIterationCount = 1000; + + std::vector<IoHash> Keys(kIterationCount); + + { + ChunkBundler Store(TempDir.Path(), nullptr); + Store.Initialize("test", 16, true); + + for (int i = 0; i < kIterationCount; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + const IoHash Hash = HashBuffer(ObjBuffer); + + Store.InsertChunk(ObjBuffer, Hash); + + Keys[i] = Hash; + } + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Store.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + } + + // Validate that we can still read the inserted data after closing + // the original store + + { + ChunkBundler Store(TempDir.Path(), nullptr); + Store.Initialize("test", 16, false); + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Store.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + + GcContext Ctx; + Store.CollectGarbage(Ctx); + } +} + +TEST_CASE("chunkbundler.totalsize") +{ + std::random_device rd; + std::mt19937 g(rd()); + + const auto CreateChunk = [&](uint64_t Size) -> IoBuffer { + const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t)); + std::vector<uint32_t> Values; + Values.resize(Count); + for (size_t Idx = 0; Idx < Count; ++Idx) + { + Values[Idx] = static_cast<uint32_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t)); + }; + + ScopedTemporaryDirectory TempDir; + + CreateDirectories(TempDir.Path()); + + const uint64_t kChunkSize = 1024; + const int32_t kChunkCount = 16; + + { + ChunkBundler Store(TempDir.Path(), nullptr); + Store.Initialize("test", 16, true); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + auto InsertResult = Store.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + } + + const uint64_t TotalSize = Store.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + + { + ChunkBundler Store(TempDir.Path(), nullptr); + Store.Initialize("test", 16, false); + + const uint64_t TotalSize = Store.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } +} + +TEST_CASE("chunkbundler.gc.basic") +{ + ScopedTemporaryDirectory TempDir; + + ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); + + ChunkBundlerStore.Initialize("cb", 1 << 4, true); + + IoBuffer Chunk = CreateChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + + const auto InsertResult = ChunkBundlerStore.InsertChunk(Chunk, ChunkHash); + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + ChunkBundlerStore.CollectGarbage(GcCtx); + + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHash)); +} + +TEST_CASE("chunkbundler.gc.compact") +{ + ScopedTemporaryDirectory TempDir; + + ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); + ChunkBundlerStore.Initialize("cb", 1 << 4, true); + + uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; + IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]), + CreateChunk(ChunkSizes[1]), + CreateChunk(ChunkSizes[2]), + CreateChunk(ChunkSizes[3]), + CreateChunk(ChunkSizes[4]), + CreateChunk(ChunkSizes[5]), + CreateChunk(ChunkSizes[6]), + CreateChunk(ChunkSizes[7]), + CreateChunk(ChunkSizes[8])}; + IoHash ChunkHashes[9] = { + IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()), + IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()), + IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()), + IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()), + IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()), + IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()), + IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()), + IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()), + IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()), + }; + + CHECK(ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]).New); + CHECK(ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]).New); + CHECK(ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]).New); + CHECK(ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]).New); + CHECK(ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]).New); + CHECK(ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]).New); + CHECK(ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]).New); + CHECK(ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]).New); + CHECK(ChunkBundlerStore.InsertChunk(Chunks[8], ChunkHashes[8]).New); + + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[1])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[2])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[3])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[5])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); + + auto InitialSize = ChunkBundlerStore.StorageSize().DiskSize; + + // Keep first and last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + ChunkBundlerStore.CollectGarbage(GcCtx); + + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); + } + + ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]); + ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]); + ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); + ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]); + ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); + ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]); + ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]); + + // Keep last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + ChunkBundlerStore.CollectGarbage(GcCtx); + + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); + + ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]); + ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]); + ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); + ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]); + ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); + ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]); + ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep mixed + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[1]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[7]); + GcCtx.ContributeCas(KeepChunks); + + ChunkBundlerStore.CollectGarbage(GcCtx); + + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[1])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[1] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7]))); + + ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]); + ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]); + ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); + ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); + ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]); + ChunkBundlerStore.InsertChunk(Chunks[8], ChunkHashes[8]); + } + + // Keep multiple at end + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[7]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + ChunkBundlerStore.CollectGarbage(GcCtx); + + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); + + ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]); + ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]); + ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]); + ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); + ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]); + ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); + } + + // Verify that we nicely appended blocks even after all GC operations + CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[1] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[3] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[3]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[5] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[5]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); + + auto FinalSize = ChunkBundlerStore.StorageSize().DiskSize; + CHECK(InitialSize == FinalSize); +} + +void +chunkbundler_forcelink() +{ +} + +} // namespace zen diff --git a/zenstore/chunkbundler.h b/zenstore/chunkbundler.h new file mode 100644 index 000000000..498320a6a --- /dev/null +++ b/zenstore/chunkbundler.h @@ -0,0 +1,126 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <zenstore/basicfile.h> +#include <zenstore/cas.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> + +#include <atomic> +#include <unordered_map> + +namespace spdlog { +class logger; +} + +namespace zen { + +class ChunkBundlerValidator +{ +public: + virtual bool ValidateChunk(IoBuffer Buffer, IoHash Key) = 0; +}; + +#pragma pack(push) +#pragma pack(1) + +struct CompactDiskLocation +{ + CompactDiskLocation(uint64_t InOffset, uint64_t InSize) + { + ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); + ZEN_ASSERT(InSize <= 0xff'ffff'ffff); + + memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); + memcpy(&m_Size[0], &InSize, sizeof m_Size); + } + + CompactDiskLocation() = default; + + inline uint64_t GetOffset() const + { + uint64_t Offset = 0; + memcpy(&Offset, &m_Offset, sizeof m_Offset); + return Offset; + } + + inline uint64_t GetSize() const + { + uint64_t Size = 0; + memcpy(&Size, &m_Size, sizeof m_Size); + return Size; + } + +private: + uint8_t m_Offset[5]; + uint8_t m_Size[5]; +}; + +struct CompactDiskIndexEntry +{ + static const uint8_t kTombstone = 0x01; + + IoHash Key; + CompactDiskLocation Location; + ZenContentType ContentType = ZenContentType::kUnknownContentType; + uint8_t Flags = 0; +}; + +#pragma pack(pop) + +static_assert(sizeof(CompactDiskIndexEntry) == 32); + +class ChunkBundler final +{ +public: + ChunkBundler(std::filesystem::path RootDirectory, ChunkBundlerValidator* Validator); + ~ChunkBundler(); + + struct InsertResult + { + bool New = false; + }; + + void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore); + InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); + InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); + IoBuffer FindChunk(const IoHash& ChunkHash); + bool HaveChunk(const IoHash& ChunkHash); + void FilterChunks(CasChunkSet& InOutChunks); + void Flush(); + void Scrub(ScrubContext& Ctx); + void CollectGarbage(GcContext& GcCtx); + GcStorageSize StorageSize() const; + +private: + spdlog::logger& Log() { return m_Log; } + + ChunkBundlerValidator* m_Validator; + spdlog::logger& m_Log; + uint64_t m_PayloadAlignment = 1 << 4; + bool m_IsInitialized = false; + BasicFile m_SmallObjectFile; + BasicFile m_SmallObjectIndex; + std::filesystem::path RootDirectory; + TCasLogFile<CompactDiskIndexEntry> m_OpLog; + std::filesystem::path m_RootDirectory; + std::string m_ContainerBaseName; + + RwLock m_LocationMapLock; + RwLock m_InsertLock; // used to serialize inserts + std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher> m_LocationMap; + std::atomic_uint64_t m_CurrentInsertOffset{}; + std::atomic_uint64_t m_CurrentIndexOffset{}; + std::atomic_uint64_t m_TotalSize{}; + + void MakeIndexSnapshot(); +}; + +////////////////////////////////////////////////////////////////////////// + +void chunkbundler_forcelink(); + +} // namespace zen diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp index dbb3dbbf7..d5baf7e26 100644 --- a/zenstore/zenstore.cpp +++ b/zenstore/zenstore.cpp @@ -5,6 +5,7 @@ #include <zenstore/basicfile.h> #include <zenstore/cas.h> #include <zenstore/gc.h> +#include "chunkbundler.h" #include "compactcas.h" #include "filecas.h" @@ -18,6 +19,7 @@ zenstore_forcelinktests() filecas_forcelink(); compactcas_forcelink(); gc_forcelink(); + chunkbundler_forcelink(); } } // namespace zen |