diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-14 15:26:37 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:28:31 +0200 |
| commit | b9329e02b21a7bbffd32277c51ea0050e5856027 (patch) | |
| tree | 1d7ec65e06c1e79ce46c2d7eee9afa7b7cde0301 | |
| parent | WIP (diff) | |
| download | zen-b9329e02b21a7bbffd32277c51ea0050e5856027.tar.xz zen-b9329e02b21a7bbffd32277c51ea0050e5856027.zip | |
Split chunkbundler into size-limited blocks
| -rw-r--r-- | zenstore/chunkbundler.cpp | 209 | ||||
| -rw-r--r-- | zenstore/chunkbundler.h | 42 |
2 files changed, 150 insertions, 101 deletions
diff --git a/zenstore/chunkbundler.cpp b/zenstore/chunkbundler.cpp index ca5066e33..87b71df09 100644 --- a/zenstore/chunkbundler.cpp +++ b/zenstore/chunkbundler.cpp @@ -50,18 +50,58 @@ ChunkBundler::~ChunkBundler() } void -ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) +ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); + ZEN_ASSERT(MaxBlockSize > 0); m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; + m_MaxBlockSize = MaxBlockSize; std::filesystem::path SobsPath = m_RootDirectory / (m_ContainerBaseName + ".ucas"); std::filesystem::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog"); - m_SmallObjectFile.Open(SobsPath, IsNewStore); + CreateDirectories(m_RootDirectory / "ucas"); + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_RootDirectory / "ucas")) + { + if (Entry.is_regular_file()) + { + if (IsNewStore) + { + std::filesystem::remove(Entry.path()); + continue; + } + if (Entry.path().extension() == ".ucas") + { + try + { + std::string stem = Entry.path().stem().string(); + uint16_t fileIndex = static_cast<uint16_t>(std::stoi(stem)); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(Entry.path(), false); + m_OpenBlocks[fileIndex] = SmallObjectFile; + m_CurrentFileIndex = std::max<uint16_t>(m_CurrentFileIndex, fileIndex); + } + catch(const std::invalid_argument&) + { + // Non-valid file, skip it + } + } + } + } + if (m_OpenBlocks.empty()) + { + std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas"); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(path, true); + m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile; + m_CurrentInsertOffset = 0; + } + + m_CurrentBlock = m_OpenBlocks[m_CurrentFileIndex]; + m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment)); m_OpLog.Open(SlogPath, IsNewStore); // TODO: should validate integrity of container files here @@ -98,16 +138,12 @@ ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t Alig } }); - uint64_t MaxFileOffset = 0; for (const auto& Entry : m_LocationMap) { const auto& Location = Entry.second; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Location.GetOffset() + Location.GetSize()); - m_TotalSize.fetch_add(Location.GetSize()); + m_TotalSize.fetch_add(Location.Size); } - m_CurrentInsertOffset = AlignPositon(MaxFileOffset, m_PayloadAlignment); - m_IsInitialized = true; } @@ -128,11 +164,22 @@ ChunkBundler::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& // New entry - const uint64_t InsertOffset = m_CurrentInsertOffset; - m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset); - m_CurrentInsertOffset = AlignPositon(m_CurrentInsertOffset + ChunkSize, m_PayloadAlignment); + uint64_t CurrentBlockSize = m_CurrentBlock.lock()->FileSize(); + if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize) + { + m_CurrentFileIndex++; + std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas"); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(path, true); + m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile; + m_CurrentBlock = SmallObjectFile; + m_CurrentInsertOffset = 0; + } + const uint32_t InsertOffset = m_CurrentInsertOffset; + m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset); + m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment)); - const CompactDiskLocation Location{InsertOffset, ChunkSize}; + const CompactDiskLocation Location{m_CurrentFileIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)}; CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; RwLock::ExclusiveLockScope __(m_LocationMapLock); @@ -158,7 +205,10 @@ ChunkBundler::FindChunk(const IoHash& ChunkHash) { const CompactDiskLocation& Location = KeyIt->second; - return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); + if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end()) + { + return IoBufferBuilder::MakeFromFileHandle(BlockIt->second->Handle(), Location.Offset, Location.Size); + } } // Not found @@ -196,16 +246,13 @@ void ChunkBundler::Flush() { m_OpLog.Flush(); - m_SmallObjectFile.Flush(); + m_CurrentBlock.lock()->Flush(); } void ChunkBundler::Scrub(ScrubContext& Ctx) { const uint64_t WindowSize = 4 * 1024 * 1024; - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; - const uint64_t FileSize = m_SmallObjectFile.FileSize(); std::vector<CompactDiskIndexEntry> BigChunks; std::vector<CompactDiskIndexEntry> BadChunks; @@ -221,47 +268,57 @@ ChunkBundler::Scrub(ScrubContext& Ctx) RwLock::SharedLockScope _(m_LocationMapLock); - do + for (const auto& Block : m_OpenBlocks) { - const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); - m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; + auto& SmallObjectFile = *Block.second; + const uint64_t FileSize = SmallObjectFile.FileSize(); - for (auto& Entry : m_LocationMap) + do { - const uint64_t EntryOffset = Entry.second.GetOffset(); + const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); + SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); - if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + for (auto& Entry : m_LocationMap) { - const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); + const uint64_t EntryOffset = Entry.second.Offset; - if (EntryEnd >= WindowEnd) + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { - BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); - - continue; - } - - if (m_Validator && !m_Validator->ValidateChunk( - IoBuffer(IoBuffer::Wrap, - reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart, - Entry.second.GetSize()), - Entry.first)) - { - // Hash mismatch - BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + const uint64_t EntryEnd = EntryOffset + Entry.second.Size; + + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + + continue; + } + + if (m_Validator && !m_Validator->ValidateChunk( + IoBuffer(IoBuffer::Wrap, + reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, + Entry.second.Size), + Entry.first)) + { + // Hash mismatch + BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + } } } - } - WindowStart += WindowSize; - WindowEnd += WindowSize; - } while (WindowStart < FileSize); -#if 0 + WindowStart += WindowSize; + WindowEnd += WindowSize; + } while (WindowStart < FileSize); + } + // TODO: Figure out API for this // Deal with large chunks - for (const auto& Entry : BigChunks) +#if 0 + for (const auto& _ : BigChunks) { - IoHashStream Hasher; + auto& ___ = *m_OpenBlocks[Entry.Location.BlockIndex]; + /* IoHashStream Hasher; m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); @@ -271,8 +328,9 @@ ChunkBundler::Scrub(ScrubContext& Ctx) { BadChunks.push_back(Entry); } + */ } -#endif +#endif // 0 } if (BadChunks.empty()) @@ -328,12 +386,16 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) // added betwen each move of a block. ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); + std::vector<IoHash> DeletedChunks; + std::unordered_set<int> BlocksToReWrite; { RwLock::ExclusiveLockScope _i(m_InsertLock); RwLock::ExclusiveLockScope _l(m_LocationMapLock); Flush(); + BlocksToReWrite.reserve(m_OpenBlocks.size()); + if (m_LocationMap.empty()) { ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_RootDirectory / m_ContainerBaseName); @@ -350,7 +412,6 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) TotalChunkHashes.push_back(Entry.first); } - std::vector<IoHash> DeletedChunks; std::vector<IoHash> ChunkHashes; // Same sort order as ChunkLocations ChunkHashes.reserve(m_LocationMap.size()); @@ -381,18 +442,18 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { auto LhsKeyIt = m_LocationMap.find(Lhs); auto RhsKeyIt = m_LocationMap.find(Rhs); - return LhsKeyIt->second.GetOffset() < RhsKeyIt->second.GetOffset(); + return LhsKeyIt->second.Offset < RhsKeyIt->second.Offset; }); uint64_t NewTotalSize = 0; std::vector<CompactDiskLocation> ChunkLocations; ChunkLocations.reserve(ChunkHashes.size()); - for (auto Entry : ChunkHashes) + for (auto ChunkHash : ChunkHashes) { - auto KeyIt = m_LocationMap.find(Entry); + auto KeyIt = m_LocationMap.find(ChunkHash); const auto& ChunkLocation = KeyIt->second; ChunkLocations.push_back(ChunkLocation); - NewTotalSize += ChunkLocation.GetSize(); + NewTotalSize += ChunkLocation.Size; } if (!CollectSmallObjects) @@ -406,20 +467,31 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) return; } - for (auto ChunkHash : reverse(DeletedChunks)) + for (auto ChunkHash : DeletedChunks) { - auto KeyIt = m_LocationMap.find(ChunkHash); - const auto& ChunkLocation = KeyIt->second; - uint64_t NextChunkOffset = ChunkLocation.GetOffset() + ChunkLocation.GetSize(); + auto KeyIt = m_LocationMap.find(ChunkHash); + const auto& ChunkLocation = KeyIt->second; + BlocksToReWrite.insert(ChunkLocation.BlockIndex); m_OpLog.Append({.Key = ChunkHash, .Location = ChunkLocation, .Flags = CompactDiskIndexEntry::kTombstone}); m_LocationMap.erase(ChunkHash); - if (m_CurrentInsertOffset == NextChunkOffset) - { - m_CurrentInsertOffset = ChunkLocation.GetOffset(); - } - m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.GetSize())); + m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.Size)); + } + + if (BlocksToReWrite.contains(m_CurrentFileIndex)) + { + m_CurrentFileIndex++; + std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas"); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(path, true); + m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile; + m_CurrentBlock = SmallObjectFile; + m_CurrentInsertOffset = 0; } + } + { + // Rewrite all BlocksToReWrite +#if 0 // We can break here if we only want to remove items without compacting of space std::vector<IoHash> MovedChunks; @@ -511,15 +583,12 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) { m_CurrentInsertOffset = 0; } +#endif // 0 GcCtx.DeletedCas(DeletedChunks); - uint64_t CurrentSize = m_SmallObjectFile.FileSize(); - ZEN_INFO("garbage collection complete '{}', space {} to {}, moved {} and delete {} chunks", + ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_RootDirectory / m_ContainerBaseName, - NiceBytes(CurrentSize), - NiceBytes(m_CurrentInsertOffset), - MovedChunks.size(), DeletedChunks.size()); // TODO: Should we truncate the file or just keep the size of the file and reuse the space? } @@ -679,7 +748,7 @@ TEST_CASE("chunkbundler.reopen") { ChunkBundler Store(TempDir.Path(), nullptr); - Store.Initialize("test", 16, true); + Store.Initialize("test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { @@ -712,7 +781,7 @@ TEST_CASE("chunkbundler.reopen") { ChunkBundler Store(TempDir.Path(), nullptr); - Store.Initialize("test", 16, false); + Store.Initialize("test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { @@ -757,7 +826,7 @@ TEST_CASE("chunkbundler.totalsize") { ChunkBundler Store(TempDir.Path(), nullptr); - Store.Initialize("test", 16, true); + Store.Initialize("test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { @@ -773,7 +842,7 @@ TEST_CASE("chunkbundler.totalsize") { ChunkBundler Store(TempDir.Path(), nullptr); - Store.Initialize("test", 16, false); + Store.Initialize("test", 65536, 16, false); const uint64_t TotalSize = Store.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); @@ -786,7 +855,7 @@ TEST_CASE("chunkbundler.gc.basic") ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); - ChunkBundlerStore.Initialize("cb", 1 << 4, true); + ChunkBundlerStore.Initialize("cb", 65536, 1 << 4, true); IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); @@ -806,7 +875,7 @@ TEST_CASE("chunkbundler.gc.compact") ScopedTemporaryDirectory TempDir; ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); - ChunkBundlerStore.Initialize("cb", 1 << 4, true); + ChunkBundlerStore.Initialize("cb", 65536, 1 << 4, true); uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]), diff --git a/zenstore/chunkbundler.h b/zenstore/chunkbundler.h index 498320a6a..d84ee9627 100644 --- a/zenstore/chunkbundler.h +++ b/zenstore/chunkbundler.h @@ -29,34 +29,9 @@ public: struct CompactDiskLocation { - CompactDiskLocation(uint64_t InOffset, uint64_t InSize) - { - ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); - ZEN_ASSERT(InSize <= 0xff'ffff'ffff); - - memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); - memcpy(&m_Size[0], &InSize, sizeof m_Size); - } - - CompactDiskLocation() = default; - - inline uint64_t GetOffset() const - { - uint64_t Offset = 0; - memcpy(&Offset, &m_Offset, sizeof m_Offset); - return Offset; - } - - inline uint64_t GetSize() const - { - uint64_t Size = 0; - memcpy(&Size, &m_Size, sizeof m_Size); - return Size; - } - -private: - uint8_t m_Offset[5]; - uint8_t m_Size[5]; + uint16_t BlockIndex; + uint32_t Offset; + uint32_t Size; }; struct CompactDiskIndexEntry @@ -84,7 +59,7 @@ public: bool New = false; }; - void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore); + void Initialize(const std::string_view ContainerBaseName, uint64_t MaxBlockSize, uint64_t Alignment, bool IsNewStore); InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); @@ -101,8 +76,8 @@ private: ChunkBundlerValidator* m_Validator; spdlog::logger& m_Log; uint64_t m_PayloadAlignment = 1 << 4; + uint64_t m_MaxBlockSize = 1L << 30; // 1 Gb bool m_IsInitialized = false; - BasicFile m_SmallObjectFile; BasicFile m_SmallObjectIndex; std::filesystem::path RootDirectory; TCasLogFile<CompactDiskIndexEntry> m_OpLog; @@ -112,11 +87,16 @@ private: RwLock m_LocationMapLock; RwLock m_InsertLock; // used to serialize inserts std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher> m_LocationMap; - std::atomic_uint64_t m_CurrentInsertOffset{}; + std::atomic_uint32_t m_CurrentInsertOffset{}; std::atomic_uint64_t m_CurrentIndexOffset{}; std::atomic_uint64_t m_TotalSize{}; void MakeIndexSnapshot(); + + // Reserve one block of 1Gb + std::unordered_map<uint16_t, std::shared_ptr<BasicFile>> m_OpenBlocks; + std::weak_ptr<BasicFile> m_CurrentBlock; + uint16_t m_CurrentFileIndex = 0; }; ////////////////////////////////////////////////////////////////////////// |