aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-07 18:22:26 +0200
committerGitHub <[email protected]>2022-04-07 18:22:26 +0200
commit487352bb3e1de5a96268616bb335f9ef857cd629 (patch)
tree4b03eb73f02bf03d1b4671775c1c5277a7d7341a /zenstore/compactcas.cpp
parentAdd pre-commit config (#69) (diff)
parentclean up variable naming (diff)
downloadzen-487352bb3e1de5a96268616bb335f9ef857cd629.tar.xz
zen-487352bb3e1de5a96268616bb335f9ef857cd629.zip
Merge pull request #58 from EpicGames/de/cas-store-with-block-store
de/cas store with block store
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp2443
1 files changed, 2172 insertions, 271 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 3bf0c70df..51fe7a901 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -1,28 +1,24 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenstore/cas.h>
-
#include "compactcas.h"
-#include <zencore/compactbinarybuilder.h>
+#include <zenstore/cas.h>
+
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
-#include <zencore/memory.h>
-#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
-
-#include <zenstore/gc.h>
-
-#include <filesystem>
-#include <functional>
+#include <zencore/scopeguard.h>
#include <gsl/gsl-lite.hpp>
+#include <xxhash.h>
+
#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <zenstore/cidstore.h>
# include <algorithm>
# include <random>
#endif
@@ -31,6 +27,211 @@
namespace zen {
+struct CasDiskIndexHeader
+{
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+};
+
+static_assert(sizeof(CasDiskIndexHeader) == 32);
+
+namespace {
+ std::vector<CasDiskIndexEntry> MakeCasDiskEntries(const std::unordered_map<IoHash, BlockStoreDiskLocation>& MovedChunks,
+ const std::vector<IoHash>& DeletedChunks)
+ {
+ std::vector<CasDiskIndexEntry> result;
+ result.reserve(MovedChunks.size());
+ for (const auto& MovedEntry : MovedChunks)
+ {
+ result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second});
+ }
+ for (const IoHash& ChunkHash : DeletedChunks)
+ {
+ result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone});
+ }
+ return result;
+ }
+
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".ulog";
+ const char* DataExtension = ".ucas";
+
+ std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / ContainerBaseName;
+ }
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension);
+ }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension);
+ }
+
+ std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / "blocks";
+ }
+
+ std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
+ {
+ ExtendablePathBuilder<256> Path;
+
+ char BlockHexString[9];
+ ToHexNumber(BlockIndex, BlockHexString);
+
+ Path.Append(BlocksBasePath);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
+ Path.AppendSeparator();
+ Path.Append(BlockHexString);
+ Path.Append(DataExtension);
+ return Path.ToPath();
+ }
+
+ std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / (ContainerBaseName + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / (ContainerBaseName + DataExtension);
+ }
+
+ std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / (ContainerBaseName + IndexExtension);
+ }
+
+ struct LegacyCasDiskLocation
+ {
+ LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize)
+ {
+ ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
+ ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
+
+ memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
+ memcpy(&m_Size[0], &InSize, sizeof m_Size);
+ }
+
+ LegacyCasDiskLocation() = default;
+
+ inline uint64_t GetOffset() const
+ {
+ uint64_t Offset = 0;
+ memcpy(&Offset, &m_Offset, sizeof m_Offset);
+ return Offset;
+ }
+
+ inline uint64_t GetSize() const
+ {
+ uint64_t Size = 0;
+ memcpy(&Size, &m_Size, sizeof m_Size);
+ return Size;
+ }
+
+ private:
+ uint8_t m_Offset[5];
+ uint8_t m_Size[5];
+ };
+
+ struct LegacyCasDiskIndexEntry
+ {
+ static const uint8_t kTombstone = 0x01;
+
+ IoHash Key;
+ LegacyCasDiskLocation Location;
+ ZenContentType ContentType = ZenContentType::kUnknownContentType;
+ uint8_t Flags = 0;
+ };
+
+ bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if ((Entry.Flags & ~LegacyCasDiskIndexEntry::kTombstone) != 0)
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Flags & LegacyCasDiskIndexEntry::kTombstone)
+ {
+ return true;
+ }
+ if (Entry.ContentType != ZenContentType::kUnknownContentType)
+ {
+ OutReason =
+ fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
+ return false;
+ }
+ uint64_t Size = Entry.Location.GetSize();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+ bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0)
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ return true;
+ }
+ if (Entry.ContentType != ZenContentType::kUnknownContentType)
+ {
+ OutReason =
+ fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
+ return false;
+ }
+ uint64_t Size = Entry.Location.GetSize();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
: GcStorage(Gc)
, m_Config(Config)
@@ -43,13 +244,16 @@ CasContainerStrategy::~CasContainerStrategy()
}
void
-CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore)
+CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore)
{
ZEN_ASSERT(IsPow2(Alignment));
ZEN_ASSERT(!m_IsInitialized);
+ ZEN_ASSERT(MaxBlockSize > 0);
m_ContainerBaseName = ContainerBaseName;
m_PayloadAlignment = Alignment;
+ m_MaxBlockSize = MaxBlockSize;
+ m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName);
OpenContainer(IsNewStore);
@@ -59,36 +263,79 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6
CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
+ uint32_t WriteBlockIndex;
+ Ref<BlockStoreFile> WriteBlock;
+ uint64_t InsertOffset;
{
- RwLock::SharedLockScope _(m_LocationMapLock);
- auto KeyIt = m_LocationMap.find(ChunkHash);
+ RwLock::ExclusiveLockScope _(m_InsertLock);
- if (KeyIt != m_LocationMap.end())
{
- return CasStore::InsertResult{.New = false};
+ RwLock::SharedLockScope __(m_LocationMapLock);
+ if (m_LocationMap.contains(ChunkHash))
+ {
+ return CasStore::InsertResult{.New = false};
+ }
}
- }
-
- // New entry
-
- RwLock::ExclusiveLockScope _(m_InsertLock);
- const uint64_t InsertOffset = m_CurrentInsertOffset;
- m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset);
+ // New entry
- m_CurrentInsertOffset = (m_CurrentInsertOffset + ChunkSize + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
-
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
-
- const CasDiskLocation Location{InsertOffset, ChunkSize};
-
- m_LocationMap[ChunkHash] = Location;
-
- CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
+ WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ bool IsWriting = m_WriteBlock != nullptr;
+ if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
+ {
+ if (m_WriteBlock)
+ {
+ m_WriteBlock = nullptr;
+ }
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ throw std::runtime_error(
+ fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName));
+ }
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ m_WriteBlock = new BlockStoreFile(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ }
+ m_CurrentInsertOffset = 0;
+ m_WriteBlock->Create(m_MaxBlockSize);
+ }
+ InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment);
+ WriteBlock = m_WriteBlock;
+ }
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize));
+ // We can end up in a situation that InsertChunk writes the same chunk data in
+ // different locations.
+ // We release the insert lock once we have the correct WriteBlock ready and we know
+ // where to write the data. If a new InsertChunk request for the same chunk hash/data
+ // comes in before we update m_LocationMap below we will have a race.
+ // The outcome of that is that we will write the chunk data in more than one location
+ // but the chunk hash will only point to one of the chunks.
+ // We will in that case waste space until the next GC operation.
+ //
+ // This should be a rare occasion and the current flow reduces the time we block for
+ // reads, insert and GC.
+
+ BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
+
+ WriteBlock->Write(ChunkData, ChunkSize, InsertOffset);
m_CasLog.Append(IndexEntry);
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_release);
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, Location);
+ }
+
return CasStore::InsertResult{.New = true};
}
@@ -101,31 +348,28 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
+ Ref<BlockStoreFile> ChunkBlock;
+ BlockStoreLocation Location;
{
- const CasDiskLocation& Location = KeyIt->second;
-
- return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize());
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
+ {
+ Location = KeyIt->second.Get(m_PayloadAlignment);
+ ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
+ }
+ else
+ {
+ return IoBuffer();
+ }
}
-
- // Not found
-
- return IoBuffer();
+ return ChunkBlock->GetChunk(Location.Offset, Location.Size);
}
bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
RwLock::SharedLockScope _(m_LocationMapLock);
-
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
- {
- return true;
- }
-
- return false;
+ return m_LocationMap.contains(ChunkHash);
}
void
@@ -144,20 +388,23 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
CasContainerStrategy::Flush()
{
- m_CasLog.Flush();
- m_SmallObjectIndex.Flush();
- m_SmallObjectFile.Flush();
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
+ {
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+ }
+ MakeIndexSnapshot();
}
void
CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
- const uint64_t WindowSize = 4 * 1024 * 1024;
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
- const uint64_t FileSize = m_SmallObjectFile.FileSize();
-
- std::vector<CasDiskIndexEntry> BigChunks;
std::vector<CasDiskIndexEntry> BadChunks;
// We do a read sweep through the payloads file and validate
@@ -166,62 +413,73 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
// pass. An alternative strategy would be to use memory mapping.
{
- IoBuffer ReadBuffer{WindowSize};
- void* BufferBase = ReadBuffer.MutableData();
+ std::vector<CasDiskIndexEntry> BigChunks;
+ const uint64_t WindowSize = 4 * 1024 * 1024;
+ IoBuffer ReadBuffer{WindowSize};
+ void* BufferBase = ReadBuffer.MutableData();
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time?
+ RwLock::SharedLockScope __(m_LocationMapLock);
- do
+ for (const auto& Block : m_ChunkBlocks)
{
- const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
- m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart);
+ uint64_t WindowStart = 0;
+ uint64_t WindowEnd = WindowSize;
+ const Ref<BlockStoreFile>& BlockFile = Block.second;
+ BlockFile->Open();
+ const uint64_t FileSize = BlockFile->FileSize();
- for (auto& Entry : m_LocationMap)
+ do
{
- const uint64_t EntryOffset = Entry.second.GetOffset();
+ const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
+ BlockFile->Read(BufferBase, ChunkSize, WindowStart);
- if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
+ for (auto& Entry : m_LocationMap)
{
- const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize();
+ const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment);
+ const uint64_t EntryOffset = Location.Offset;
- if (EntryEnd >= WindowEnd)
+ if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
{
- BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ const uint64_t EntryEnd = EntryOffset + Location.Size;
- continue;
- }
+ if (EntryEnd >= WindowEnd)
+ {
+ BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
- const IoHash ComputedHash =
- IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart,
- Entry.second.GetSize());
+ continue;
+ }
- if (Entry.first != ComputedHash)
- {
- // Hash mismatch
+ const IoHash ComputedHash =
+ IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, Location.Size);
- BadChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ if (Entry.first != ComputedHash)
+ {
+ // Hash mismatch
+ BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone});
+ }
}
}
- }
- WindowStart += WindowSize;
- WindowEnd += WindowSize;
- } while (WindowStart < FileSize);
- }
-
- // Deal with large chunks
+ WindowStart += WindowSize;
+ WindowEnd += WindowSize;
+ } while (WindowStart < FileSize);
+ }
- for (const CasDiskIndexEntry& Entry : BigChunks)
- {
- IoHashStream Hasher;
- m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) {
- Hasher.Append(Data, Size);
- });
- IoHash ComputedHash = Hasher.GetHash();
+ // Deal with large chunks
- if (Entry.Key != ComputedHash)
+ for (const CasDiskIndexEntry& Entry : BigChunks)
{
- BadChunks.push_back(Entry);
+ IoHashStream Hasher;
+ const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment);
+ const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex];
+ BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+
+ if (Entry.Key != ComputedHash)
+ {
+ BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone});
+ }
}
}
@@ -230,17 +488,21 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
return;
}
- ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName);
+ ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_Config.RootDirectory / m_ContainerBaseName);
// Deal with bad chunks by removing them from our lookup map
std::vector<IoHash> BadChunkHashes;
+ BadChunkHashes.reserve(BadChunks.size());
- for (const CasDiskIndexEntry& Entry : BadChunks)
+ m_CasLog.Append(BadChunks);
{
- BadChunkHashes.push_back(Entry.Key);
- m_CasLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone});
- m_LocationMap.erase(Entry.Key);
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ for (const CasDiskIndexEntry& Entry : BadChunks)
+ {
+ BadChunkHashes.push_back(Entry.Key);
+ m_LocationMap.erase(Entry.Key);
+ }
}
// Let whomever it concerns know about the bad chunks. This could
@@ -253,243 +515,1106 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
void
CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
- namespace fs = std::filesystem;
-
- // A naive garbage collection implementation that just copies evicted chunks
- // into a new container file. We probably need to partition the container file
- // into several parts to prevent needing to keep the entire container file during GC.
+ // It collects all the blocks that we want to delete chunks from. For each such
+ // block we keep a list of chunks to retain and a list of chunks to delete.
+ //
+ // If there is a block that we are currently writing to, that block is omitted
+ // from the garbage collection.
+ //
+ // Next it will iterate over all blocks that we want to remove chunks from.
+ // If the block is empty after removal of chunks we mark the block as pending
+ // delete - we want to delete it as soon as there are no IoBuffers using the
+ // block file.
+ // Once complete we update the m_LocationMap by removing the chunks.
+ //
+ // If the block is non-empty we write out the chunks we want to keep to a new
+ // block file (creating new block files as needed).
+ //
+ // We update the index as we complete each new block file. This makes it possible
+ // to break the GC if we want to limit time for execution.
+ //
+ // GC can fairly parallell to regular operation - it will block while taking
+ // a snapshot of the current m_LocationMap state.
+ //
+ // While moving blocks it will do a blocking operation and update the m_LocationMap
+ // after each new block is written and figuring out the path to the next new block.
ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ std::vector<IoHash> DeletedChunks;
+ uint64_t MovedCount = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([this,
+ &TotalTimer,
+ &WriteBlockTimeUs,
+ &WriteBlockLongestTimeUs,
+ &ReadBlockTimeUs,
+ &ReadBlockLongestTimeUs,
+ &TotalChunkCount,
+ &DeletedChunks,
+ &MovedCount,
+ &DeletedSize,
+ OldTotalSize] {
+ ZEN_INFO(
+ "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "chunks ({}).",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedChunks.size(),
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ });
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ LocationMap_t LocationMap;
+ size_t BlockCount;
+ uint64_t ExcludeBlockIndex = 0x800000000ull;
+ {
+ RwLock::SharedLockScope __(m_InsertLock);
+ RwLock::SharedLockScope ___(m_LocationMapLock);
+ {
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_WriteBlock)
+ {
+ ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ }
+ __.ReleaseNow();
+ }
+ LocationMap = m_LocationMap;
+ BlockCount = m_ChunkBlocks.size();
+ }
+
+ if (LocationMap.empty())
+ {
+ ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName);
+ return;
+ }
- Flush();
+ TotalChunkCount = LocationMap.size();
- std::vector<IoHash> Candidates;
- std::vector<IoHash> ChunksToKeep;
- std::vector<IoHash> ChunksToDelete;
- const uint64_t ChunkCount = m_LocationMap.size();
- uint64_t TotalSize{};
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<std::vector<IoHash>> KeepChunks;
+ std::vector<std::vector<IoHash>> DeleteChunks;
- Candidates.reserve(m_LocationMap.size());
+ BlockIndexToChunkMapIndex.reserve(BlockCount);
+ KeepChunks.reserve(BlockCount);
+ DeleteChunks.reserve(BlockCount);
+ size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
- for (auto& Entry : m_LocationMap)
+ std::vector<IoHash> TotalChunkHashes;
+ TotalChunkHashes.reserve(TotalChunkCount);
+ for (const auto& Entry : LocationMap)
{
- Candidates.push_back(Entry.first);
- TotalSize += Entry.second.GetSize();
+ TotalChunkHashes.push_back(Entry.first);
}
- ChunksToKeep.reserve(Candidates.size());
- GcCtx.FilterCas(Candidates, [&ChunksToKeep, &ChunksToDelete](const IoHash& Hash, bool Keep) {
+ uint64_t DeleteCount = 0;
+
+ uint64_t NewTotalSize = 0;
+ GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreDiskLocation& Location = KeyIt->second;
+ uint32_t BlockIndex = Location.GetBlockIndex();
+
+ if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex)
+ {
+ return;
+ }
+
+ auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex);
+ size_t ChunkMapIndex = 0;
+ if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
+ {
+ ChunkMapIndex = KeepChunks.size();
+ BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex;
+ KeepChunks.resize(ChunkMapIndex + 1);
+ KeepChunks.back().reserve(GuesstimateCountPerBlock);
+ DeleteChunks.resize(ChunkMapIndex + 1);
+ DeleteChunks.back().reserve(GuesstimateCountPerBlock);
+ }
+ else
+ {
+ ChunkMapIndex = BlockIndexPtr->second;
+ }
if (Keep)
{
- ChunksToKeep.push_back(Hash);
+ std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex];
+ ChunkMap.push_back(ChunkHash);
+ NewTotalSize += Location.GetSize();
}
else
{
- ChunksToDelete.push_back(Hash);
+ std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ ChunkMap.push_back(ChunkHash);
+ DeleteCount++;
}
});
- if (m_LocationMap.empty() || ChunksToKeep.size() == m_LocationMap.size())
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
+ for (const auto& Entry : BlockIndexToChunkMapIndex)
{
- ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete",
- ChunkCount,
- NiceBytes(TotalSize),
- m_Config.RootDirectory / m_ContainerBaseName);
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ if (ChunkMap.empty())
+ {
+ continue;
+ }
+ BlocksToReWrite.insert(BlockIndex);
+ }
+
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ DeleteCount,
+ NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
return;
}
- const uint64_t NewChunkCount = ChunksToKeep.size();
- uint64_t NewTotalSize = 0;
+ // Move all chunks in blocks that have chunks removed to new blocks
+
+ Ref<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
+ DeletedChunks.reserve(DeleteCount);
- for (const IoHash& Key : ChunksToKeep)
+ auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) {
+ for (const CasDiskIndexEntry& Entry : Entries)
+ {
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ auto KeyIt = m_LocationMap.find(Entry.Key);
+ uint64_t ChunkSize = KeyIt->second.GetSize();
+ m_TotalSize.fetch_sub(ChunkSize);
+ m_LocationMap.erase(KeyIt);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = Entry.Location;
+ }
+ };
+
+ std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks;
+ for (uint32_t BlockIndex : BlocksToReWrite)
{
- const CasDiskLocation& Loc = m_LocationMap[Key];
- NewTotalSize += Loc.GetSize();
+ const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
+
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _i(m_LocationMapLock);
+ OldBlockFile = m_ChunkBlocks[BlockIndex];
+ }
+
+ const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex];
+ if (KeepMap.empty())
+ {
+ const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap);
+ m_CasLog.Append(LogEntries);
+ m_CasLog.Flush();
+ {
+ RwLock::ExclusiveLockScope _i(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
+ ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'",
+ m_ContainerBaseName,
+ BlockIndex,
+ OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ continue;
+ }
+
+ std::vector<uint8_t> Chunk;
+ for (const IoHash& ChunkHash : KeepMap)
+ {
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
+ {
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {});
+ m_CasLog.Append(LogEntries);
+ m_CasLog.Flush();
+
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ }
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+ while (m_ChunkBlocks.contains(NextBlockIndex))
+ {
+ NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ }
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+
+ MovedCount += MovedBlockChunks.size();
+ MovedBlockChunks.clear();
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
+ return;
+ }
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks.erase(NextBlockIndex);
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ }
+
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedBlockChunks.emplace(
+ ChunkHash,
+ BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment));
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
+ }
+ Chunk.clear();
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = {};
+ }
+
+ const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap);
+ m_CasLog.Append(LogEntries);
+ m_CasLog.Flush();
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ MovedCount += MovedBlockChunks.size();
+ DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
+ MovedBlockChunks.clear();
+
+ ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ OldBlockFile = nullptr;
}
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
+ for (const IoHash& ChunkHash : DeletedChunks)
{
- ZEN_ERROR("get disk space FAILED, reason '{}'", Error.message());
- return;
+ DeletedSize += LocationMap[ChunkHash].GetSize();
}
- if (Space.Free < NewTotalSize + (64 << 20))
- {
- ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}",
+ GcCtx.DeletedCas(DeletedChunks);
+}
+
+void
+CasContainerStrategy::MakeIndexSnapshot()
+{
+ ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &EntryCount, &Timer] {
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(NewTotalSize),
- NiceBytes(Space.Free));
- return;
- }
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ fs::path TempIndexPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- if (!CollectSmallObjects)
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(TempIndexPath))
{
- ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- ChunkCount - NewChunkCount,
- NiceBytes(TotalSize - NewTotalSize),
- ChunkCount,
- NiceBytes(TotalSize));
- return;
+ fs::remove(TempIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, TempIndexPath);
}
- fs::path TmpSobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ucas");
- fs::path TmpSlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ulog");
-
+ try
{
- ZEN_DEBUG("creating temporary container cas '{}'...", TmpSobsPath);
+ m_CasLog.Flush();
+
+ // Write the current state of the location map to a new index state
+ uint64_t LogCount = 0;
+ std::vector<CasDiskIndexEntry> Entries;
- TCasLogFile<CasDiskIndexEntry> TmpLog;
- BasicFile TmpObjectFile;
- bool IsNew = true;
+ {
+ RwLock::SharedLockScope __(m_InsertLock);
+ RwLock::SharedLockScope ___(m_LocationMapLock);
+ Entries.resize(m_LocationMap.size());
- TmpLog.Open(TmpSlogPath, IsNew);
- TmpObjectFile.Open(TmpSobsPath, IsNew);
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_LocationMap)
+ {
+ CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = Entry.second;
+ }
- std::vector<uint8_t> Chunk;
- uint64_t NextInsertOffset{};
+ LogCount = m_CasLog.GetLogCount();
+ }
- for (const IoHash& Key : ChunksToKeep)
- {
- const auto Entry = m_LocationMap.find(Key);
- const auto& Loc = Entry->second;
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ CasDiskIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header);
- Chunk.resize(Loc.GetSize());
- m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), Loc.GetOffset());
+ ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
- const uint64_t InsertOffset = NextInsertOffset;
- TmpObjectFile.Write(Chunk.data(), Chunk.size(), InsertOffset);
- TmpLog.Append({.Key = Key, .Location = {InsertOffset, Chunk.size()}});
+ // Restore any previous snapshot
- NextInsertOffset = (NextInsertOffset + Chunk.size() + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
+ if (fs::is_regular_file(TempIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(TempIndexPath, IndexPath);
}
}
+ if (fs::is_regular_file(TempIndexPath))
+ {
+ fs::remove(TempIndexPath);
+ }
+}
- try
+uint64_t
+CasContainerStrategy::ReadIndexFile()
+{
+ std::vector<CasDiskIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ if (std::filesystem::is_regular_file(IndexPath))
{
- CloseContainer();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &Entries, &Timer] {
+ ZEN_INFO("read store '{}' index containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- fs::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas");
- fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx");
- fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CasDiskIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
+ CasDiskIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
+ {
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+ m_PayloadAlignment = Header.PayloadAlignment;
- fs::remove(SobsPath);
- fs::remove(SidxPath);
- fs::remove(SlogPath);
+ std::string InvalidEntryReason;
+ for (const CasDiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = Entry.Location;
+ }
- fs::rename(TmpSobsPath, SobsPath);
- fs::rename(TmpSlogPath, SlogPath);
+ return Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ }
+ }
+ }
+ return 0;
+}
+
+uint64_t
+CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
+{
+ std::vector<CasDiskIndexEntry> Entries;
+ std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &Entries, &Timer] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
{
- // Create a new empty index file
- BasicFile SidxFile;
- SidxFile.Open(SidxPath, true);
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ uint64_t ReadCount = EntryCount - SkipEntryCount;
+ Entries.reserve(ReadCount);
+ CasLog.Replay(
+ [&](const CasDiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ m_LocationMap.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ return;
+ }
+ m_LocationMap[Record.Key] = Record.Location;
+ },
+ SkipEntryCount);
+ return ReadCount;
}
+ }
+ return 0;
+}
- OpenContainer(false /* IsNewStore */);
+uint64_t
+CasContainerStrategy::MigrateLegacyData(bool CleanSource)
+{
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- GcCtx.DeletedCas(ChunksToDelete);
+ if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
+ {
+ return 0;
+ }
- ZEN_INFO("garbage collect from '{}' DONE, collected #{} {} chunks of total #{} {}",
+ ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LegacyIndexPath = GetLegacyIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ uint64_t MigratedChunkCount = 0;
+ uint32_t MigratedBlockCount = 0;
+ Stopwatch MigrationTimer;
+ uint64_t TotalSize = 0;
+ const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] {
+ ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
m_Config.RootDirectory / m_ContainerBaseName,
- ChunkCount - NewChunkCount,
- NiceBytes(TotalSize - NewTotalSize),
- ChunkCount,
+ MigratedChunkCount,
+ MigratedBlockCount,
+ NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
NiceBytes(TotalSize));
+ });
+
+ uint32_t WriteBlockIndex = 0;
+ while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
}
- catch (std::exception& Err)
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
+ if (Error)
{
- ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what());
+ ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
+ return 0;
+ }
+
+ if (Space.Free < m_MaxBlockSize)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free));
+ return 0;
+ }
+
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
- // Something went wrong, try create a new container
- OpenContainer(true /* IsNewStore */);
+ std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
+ uint64_t InvalidEntryCount = 0;
- GcCtx.DeletedCas(ChunksToDelete);
- GcCtx.DeletedCas(ChunksToKeep);
+ TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
+ LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
+ {
+ Stopwatch Timer;
+ const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] {
+ ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ LegacyDiskIndex.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ if (LegacyCasLog.Initialize())
+ {
+ LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
+ LegacyCasLog.Replay(
+ [&](const LegacyCasDiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ InvalidEntryCount++;
+ return;
+ }
+ LegacyDiskIndex.insert_or_assign(Record.Key, Record);
+ },
+ 0);
+
+ std::vector<IoHash> BadEntries;
+ uint64_t BlockFileSize = BlockFile.FileSize();
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyCasDiskIndexEntry& Record(Entry.second);
+ if (Record.Location.GetOffset() + Record.Location.GetSize() <= BlockFileSize)
+ {
+ continue;
+ }
+ ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
+ BadEntries.push_back(Entry.first);
+ }
+ for (const IoHash& BadHash : BadEntries)
+ {
+ LegacyDiskIndex.erase(BadHash);
+ }
+ InvalidEntryCount += BadEntries.size();
+ }
}
-}
-void
-CasContainerStrategy::MakeSnapshot()
-{
- RwLock::SharedLockScope _(m_LocationMapLock);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_Config.RootDirectory / m_ContainerBaseName);
+ }
- std::vector<CasDiskIndexEntry> Entries{m_LocationMap.size()};
+ if (LegacyDiskIndex.empty())
+ {
+ BlockFile.Close();
+ LegacyCasLog.Close();
+ if (CleanSource)
+ {
+ // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
+ // a CAS manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySidx;
+ LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate);
+ }
+ return 0;
+ }
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_LocationMap)
+ for (const auto& Entry : LegacyDiskIndex)
{
- CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = Entry.second;
+ const LegacyCasDiskIndexEntry& Record(Entry.second);
+ TotalSize += Record.Location.GetSize();
}
- m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0);
-}
+ uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size());
+ uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize;
+ if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ MaxRequiredBlockCount,
+ BlockStoreDiskLocation::MaxBlockIndex);
+ return 0;
+ }
-void
-CasContainerStrategy::OpenContainer(bool IsNewStore)
-{
- std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas");
- std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx");
- std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
+ constexpr const uint64_t DiskReserve = 1ul << 28;
- m_SmallObjectFile.Open(SobsPath, IsNewStore);
- m_SmallObjectIndex.Open(SidxPath, IsNewStore);
- m_CasLog.Open(SlogPath, IsNewStore);
+ if (CleanSource)
+ {
+ if (Space.Free < (m_MaxBlockSize + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceBytes(m_MaxBlockSize + DiskReserve),
+ NiceBytes(Space.Free));
+ return 0;
+ }
+ }
+ else
+ {
+ if (Space.Free < (RequiredDiskSpace + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceBytes(RequiredDiskSpace + DiskReserve),
+ NiceBytes(Space.Free));
+ return 0;
+ }
+ }
- // TODO: should validate integrity of container files here
+ std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ CreateDirectories(LogPath.parent_path());
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- m_CurrentInsertOffset = 0;
- m_CurrentIndexOffset = 0;
- m_TotalSize = 0;
+ if (CleanSource && (MaxRequiredBlockCount < 2))
+ {
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(LegacyDiskIndex.size());
- m_LocationMap.clear();
+ // We can use the block as is, just move it and add the blocks to our new log
+ for (auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyCasDiskIndexEntry& Record(Entry.second);
- uint64_t MaxFileOffset = 0;
+ BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize());
+ BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment);
+ LogEntries.push_back(
+ {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ CreateDirectories(BlockPath.parent_path());
+ BlockFile.Close();
+ std::filesystem::rename(LegacyDataPath, BlockPath);
+ CasLog.Append(LogEntries);
+ for (const CasDiskIndexEntry& Entry : LogEntries)
+ {
+ m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
+ }
- m_CasLog.Replay([&](const CasDiskIndexEntry& Record) {
- if (Record.Flags & CasDiskIndexEntry::kTombstone)
+ MigratedChunkCount += LogEntries.size();
+ MigratedBlockCount++;
+ }
+ else
+ {
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(LegacyDiskIndex.size());
+ for (const auto& Entry : LegacyDiskIndex)
{
- m_TotalSize.fetch_sub(Record.Location.GetSize());
+ ChunkHashes.push_back(Entry.first);
}
- else
+
+ std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
+ auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
+ auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
+ return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset();
+ });
+
+ uint64_t BlockSize = 0;
+ uint64_t BlockOffset = 0;
+ std::vector<BlockStoreLocation> NewLocations;
+ struct BlockData
{
- m_TotalSize.fetch_add(Record.Location.GetSize());
- m_LocationMap[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize());
+ std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
+ uint64_t BlockOffset;
+ uint64_t BlockSize;
+ uint32_t BlockIndex;
+ };
+
+ std::vector<BlockData> BlockRanges;
+ std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
+ BlockRanges.reserve(MaxRequiredBlockCount);
+ for (const IoHash& ChunkHash : ChunkHashes)
+ {
+ const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
+
+ uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
+ uint64_t ChunkSize = LegacyChunkLocation.GetSize();
+ uint64_t ChunkEnd = ChunkOffset + ChunkSize;
+
+ if (BlockSize == 0)
+ {
+ BlockOffset = ChunkOffset;
+ }
+ if ((ChunkEnd - BlockOffset) > m_MaxBlockSize)
+ {
+ BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
+ BlockRange.Chunks.swap(Chunks);
+ BlockRanges.push_back(BlockRange);
+
+ WriteBlockIndex++;
+ while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+ BlockOffset = ChunkOffset;
+ BlockSize = 0;
+ }
+ BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
+ BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
+ Chunks.push_back({ChunkHash, ChunkLocation});
+ BlockSize = ChunkEnd - BlockOffset;
}
- });
+ if (BlockSize > 0)
+ {
+ BlockRanges.push_back(
+ {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
+ }
+ Stopwatch WriteBlockTimer;
+
+ std::reverse(BlockRanges.begin(), BlockRanges.end());
+ std::vector<std::uint8_t> Buffer(1 << 28);
+ for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
+ {
+ const BlockData& BlockRange = BlockRanges[Idx];
+ if (Idx > 0)
+ {
+ uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
+ uint64_t Completed = BlockOffset + BlockSize - Remaining;
+ uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
+
+ ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Idx,
+ BlockRanges.size(),
+ NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
+ NiceBytes(BlockOffset + BlockSize),
+ NiceTimeSpanMs(ETA));
+ }
+
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex);
+ BlockStoreFile ChunkBlock(BlockPath);
+ ChunkBlock.Create(BlockRange.BlockSize);
+ uint64_t Offset = 0;
+ while (Offset < BlockRange.BlockSize)
+ {
+ uint64_t Size = BlockRange.BlockSize - Offset;
+ if (Size > Buffer.size())
+ {
+ Size = Buffer.size();
+ }
+ BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
+ ChunkBlock.Write(Buffer.data(), Size, Offset);
+ Offset += Size;
+ }
+ ChunkBlock.Truncate(Offset);
+ ChunkBlock.Flush();
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(BlockRange.Chunks.size());
+ for (const auto& Entry : BlockRange.Chunks)
+ {
+ const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
+ BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment);
+ LogEntries.push_back(
+ {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags});
+ }
+ CasLog.Append(LogEntries);
+ for (const CasDiskIndexEntry& Entry : LogEntries)
+ {
+ m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
+ }
+ MigratedChunkCount += LogEntries.size();
+ MigratedBlockCount++;
+
+ if (CleanSource)
+ {
+ std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
+ LegacyLogEntries.reserve(BlockRange.Chunks.size());
+ for (const auto& Entry : BlockRange.Chunks)
+ {
+ LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone});
+ }
+ LegacyCasLog.Append(LegacyLogEntries);
+ BlockFile.SetFileSize(BlockRange.BlockOffset);
+ }
+ }
+ }
- m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
- m_CurrentIndexOffset = m_SmallObjectIndex.FileSize();
+ BlockFile.Close();
+ LegacyCasLog.Close();
+ CasLog.Close();
+
+ if (CleanSource)
+ {
+ // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
+ // a CAS manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySidx;
+ LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate);
+ }
+ return MigratedChunkCount;
}
void
-CasContainerStrategy::CloseContainer()
+CasContainerStrategy::OpenContainer(bool IsNewStore)
{
- m_SmallObjectFile.Close();
- m_SmallObjectIndex.Close();
- m_CasLog.Close();
+ // Add .running file and delete on clean on close to detect bad termination
+ m_TotalSize = 0;
+
+ m_LocationMap.clear();
+
+ std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ if (IsNewStore)
+ {
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ std::filesystem::remove(LegacyLogPath);
+ std::filesystem::remove(LegacyDataPath);
+ std::filesystem::remove_all(BasePath);
+ }
+
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
+ uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+
+ CreateDirectories(BasePath);
+
+ std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : m_LocationMap)
+ {
+ const BlockStoreDiskLocation& Location = Entry.second;
+ m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_release);
+ KnownBlocks.insert(Location.GetBlockIndex());
+ }
+
+ if (std::filesystem::is_directory(m_BlocksBasePath))
+ {
+ std::vector<std::filesystem::path> FoldersToScan;
+ FoldersToScan.push_back(m_BlocksBasePath);
+ size_t FolderOffset = 0;
+ while (FolderOffset < FoldersToScan.size())
+ {
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
+ {
+ if (Entry.is_directory())
+ {
+ FoldersToScan.push_back(Entry.path());
+ continue;
+ }
+ if (Entry.is_regular_file())
+ {
+ const std::filesystem::path Path = Entry.path();
+ if (Path.extension() != DataExtension)
+ {
+ continue;
+ }
+ std::string FileName = Path.stem().string();
+ uint32_t BlockIndex;
+ bool OK = ParseHexNumber(FileName, BlockIndex);
+ if (!OK)
+ {
+ continue;
+ }
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ // Log removing unreferenced block
+ // Clear out unused blocks
+ ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path);
+ std::error_code Ec;
+ std::filesystem::remove(Path, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
+ }
+ continue;
+ }
+ Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
+ BlockFile->Open();
+ m_ChunkBlocks[BlockIndex] = BlockFile;
+ }
+ }
+ ++FolderOffset;
+ }
+ }
+ else
+ {
+ CreateDirectories(m_BlocksBasePath);
+ }
+
+ if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ {
+ MakeIndexSnapshot();
+ }
+
+ // TODO: should validate integrity of container files here
}
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
-TEST_CASE("cas.compact.gc")
+namespace {
+ static IoBuffer CreateChunk(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
+ }
+} // namespace
+
+TEST_CASE("compactcas.hex")
+{
+ uint32_t Value;
+ std::string HexString;
+ CHECK(!ParseHexNumber("", Value));
+ char Hex[9];
+
+ ToHexNumber(0u, Hex);
+ HexString = std::string(Hex);
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0u);
+
+ ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "ffffffff");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == std::numeric_limits<std::uint32_t>::max());
+
+ ToHexNumber(0xadf14711u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "adf14711");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0xadf14711u);
+
+ ToHexNumber(0x80000000u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "80000000");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0x80000000u);
+
+ ToHexNumber(0x718293a4u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "718293a4");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0x718293a4u);
+}
+
+TEST_CASE("compactcas.compact.gc")
{
ScopedTemporaryDirectory TempDir;
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path();
-
CreateDirectories(CasConfig.RootDirectory);
const int kIterationCount = 1000;
@@ -499,7 +1624,7 @@ TEST_CASE("cas.compact.gc")
{
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, true);
+ Cas.Initialize("test", 65536, 16, true);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -533,7 +1658,7 @@ TEST_CASE("cas.compact.gc")
{
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, false);
+ Cas.Initialize("test", 65536, 16, false);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -545,67 +1670,843 @@ TEST_CASE("cas.compact.gc")
CHECK_EQ(Value["id"].AsInt32(), i);
}
-
- GcContext Ctx;
- Cas.CollectGarbage(Ctx);
}
}
-TEST_CASE("cas.compact.totalsize")
+TEST_CASE("compactcas.compact.totalsize")
{
std::random_device rd;
std::mt19937 g(rd());
- const auto CreateChunk = [&](uint64_t Size) -> IoBuffer {
- const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t));
- std::vector<uint32_t> Values;
- Values.resize(Count);
- for (size_t Idx = 0; Idx < Count; ++Idx)
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const uint64_t kChunkSize = 1024;
+ const int32_t kChunkCount = 16;
+
{
- Values[Idx] = static_cast<uint32_t>(Idx);
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 65536, 16, true);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ const IoHash Hash = HashBuffer(Chunk);
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ }
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
}
- std::shuffle(Values.begin(), Values.end(), g);
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t));
- };
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 65536, 16, false);
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+
+ // Re-open again, this time we should have a snapshot
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 65536, 16, false);
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+ }
+}
+
+TEST_CASE("compactcas.gc.basic")
+{
ScopedTemporaryDirectory TempDir;
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("cb", 65536, 1 << 4, true);
+
+ IoBuffer Chunk = CreateChunk(128);
+ IoHash ChunkHash = IoHash::HashBuffer(Chunk);
+ const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(InsertResult.New);
+ Cas.Flush();
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHash));
+}
+
+TEST_CASE("compactcas.gc.removefile")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
CreateDirectories(CasConfig.RootDirectory);
- const uint64_t kChunkSize = 1024;
- const int32_t kChunkCount = 16;
+ IoBuffer Chunk = CreateChunk(128);
+ IoHash ChunkHash = IoHash::HashBuffer(Chunk);
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("cb", 65536, 1 << 4, true);
+
+ const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(InsertResult.New);
+ const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(!InsertResultDup.New);
+ Cas.Flush();
+ }
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("cb", 65536, 1 << 4, false);
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHash));
+}
+
+TEST_CASE("compactcas.gc.compact")
+{
+ // for (uint32_t i = 0; i < 100; ++i)
{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, true);
+ Cas.Initialize("cb", 2048, 1 << 4, true);
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(9);
+ for (uint64_t Size : ChunkSizes)
{
- IoBuffer Chunk = CreateChunk(kChunkSize);
- const IoHash Hash = HashBuffer(Chunk);
- auto InsertResult = Cas.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
+ Chunks.push_back(CreateChunk(Size));
}
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(9);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New);
+ CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New);
+ CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New);
+ CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New);
+ CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New);
+ CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New);
+ CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New);
+ CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New);
+ CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ uint64_t InitialSize = Cas.StorageSize().DiskSize;
+
+ // Keep first and last
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ }
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+
+ // Keep last
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Keep mixed
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[1]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+
+ Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[8], ChunkHashes[8]);
+ }
+
+ // Keep multiple at end
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ }
+
+ // Keep every other
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[2]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Verify that we nicely appended blocks even after all GC operations
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ uint64_t FinalSize = Cas.StorageSize().DiskSize;
+ CHECK(InitialSize == FinalSize);
}
+}
+
+TEST_CASE("compactcas.gc.deleteblockonopen")
+{
+ ScopedTemporaryDirectory TempDir;
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (uint64_t Size : ChunkSizes)
{
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ // GC every other block
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+ }
+ {
+ // Re-open
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, false);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+}
+
+TEST_CASE("compactcas.gc.handleopeniobuffer")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (const uint64_t& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]);
+ Cas.Flush();
+
+ // GC everything
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ Cas.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(!Cas.HaveChunk(ChunkHashes[i]));
+ }
+
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
+}
+
+TEST_CASE("compactcas.legacyconversion")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048};
+ size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
+ size_t SingleBlockSize = 0;
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(ChunkCount);
+ for (uint64_t Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ SingleBlockSize += Size;
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunkCount);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", gsl::narrow<uint32_t>(SingleBlockSize * 2), 16, true);
+
+ for (size_t i = 0; i < ChunkCount; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepChunks);
+ Cas.Flush();
+ Gc.CollectGarbage(GcCtx);
+ }
+
+ std::filesystem::path BlockPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test");
+ std::filesystem::rename(BlockPath, LegacyDataPath);
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ std::filesystem::path IndexPath = GetIndexPath(CasConfig.RootDirectory, "test");
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CasDiskIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
+ CasDiskIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion &&
+ Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
+ {
+ LogEntries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+ }
+ }
+ ObjectIndexFile.Close();
+ std::filesystem::remove(IndexPath);
+ }
+
+ std::filesystem::path LogPath = GetLogPath(CasConfig.RootDirectory, "test");
+ {
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ LogEntries.reserve(CasLog.GetLogCount());
+ CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
+ }
+ TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
+ std::filesystem::path LegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test");
+ LegacyCasLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
+
+ for (const CasDiskIndexEntry& Entry : LogEntries)
+ {
+ BlockStoreLocation Location = Entry.Location.Get(16);
+ LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size);
+ LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key,
+ .Location = LegacyLocation,
+ .ContentType = Entry.ContentType,
+ .Flags = Entry.Flags};
+ LegacyCasLog.Append(LegacyEntry);
+ }
+ LegacyCasLog.Close();
+
+ std::filesystem::remove_all(CasConfig.RootDirectory / "test");
+
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 2048, 16, false);
+
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+}
+
+TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(kChunkCount);
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ ChunkHashes.emplace_back(Hash);
+ Chunks.emplace_back(Chunk);
+ }
+
+ WorkerThreadPool ThreadPool(4);
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, false);
+ Cas.Initialize("test", 32768, 16, true);
+ {
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ const IoBuffer& Chunk = Chunks[Idx];
+ const IoHash& Hash = ChunkHashes[Idx];
+ ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+
+ {
+ std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() {
+ IoHash ChunkHash = OldChunkHashes[Idx];
+ IoBuffer Chunk = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ CHECK(ChunkHash == Hash);
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
+
+ std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ {
+ std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ std::vector<IoHash> NewChunkHashes;
+ NewChunkHashes.reserve(kChunkCount);
+ std::vector<IoBuffer> NewChunks;
+ NewChunks.reserve(kChunkCount);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunkHashes.emplace_back(Hash);
+ NewChunks.emplace_back(Chunk);
+ }
+
+ RwLock ChunkHashesLock;
+ std::atomic_uint32_t AddedChunkCount;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ const IoBuffer& Chunk = NewChunks[Idx];
+ const IoHash& Hash = NewChunkHashes[Idx];
+ ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ AddedChunkCount.fetch_add(1);
+ });
+ ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() {
+ IoHash ChunkHash = OldChunkHashes[Idx];
+ IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]);
+ if (Chunk)
+ {
+ CHECK(ChunkHash == IoHash::HashBuffer(Chunk));
+ }
+ });
+ }
+
+ while (AddedChunkCount.load() < kChunkCount)
+ {
+ std::vector<IoHash> AddedHashes;
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ AddedHashes.swap(NewChunkHashes);
+ }
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const IoHash& ChunkHash : AddedHashes)
+ {
+ if (Cas.HaveChunk(ChunkHash))
+ {
+ GcChunkHashes.emplace(ChunkHash);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+
+ {
+ std::vector<IoHash> AddedHashes;
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ AddedHashes.swap(NewChunkHashes);
+ }
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const IoHash& ChunkHash : AddedHashes)
+ {
+ if (Cas.HaveChunk(ChunkHash))
+ {
+ GcChunkHashes.emplace(ChunkHash);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 77 == 0 && C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ for (const IoHash& ChunkHash : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Cas, ChunkHash]() {
+ CHECK(Cas.HaveChunk(ChunkHash));
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
}
}
+TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true))
+{
+ const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas";
+ std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs");
+ std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs");
+ std::filesystem::remove_all(TobsBasePath);
+ std::filesystem::remove_all(SobsBasePath);
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = BigDataPath;
+ uint64_t TObsSize = 0;
+ {
+ CasGc TobsCasGc;
+ CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
+ TobsCas.Initialize("tobs", 1u << 28, 16, false);
+ TObsSize = TobsCas.StorageSize().DiskSize;
+ CHECK(TObsSize > 0);
+ }
+
+ uint64_t SObsSize = 0;
+ {
+ CasGc SobsCasGc;
+ CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
+ SobsCas.Initialize("sobs", 1u << 30, 4096, false);
+ SObsSize = SobsCas.StorageSize().DiskSize;
+ CHECK(SObsSize > 0);
+ }
+
+ CasGc TobsCasGc;
+ CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
+ TobsCas.Initialize("tobs", 1u << 28, 16, false);
+ GcContext TobsGcCtx;
+ TobsCas.CollectGarbage(TobsGcCtx);
+ CHECK(TobsCas.StorageSize().DiskSize == TObsSize);
+
+ CasGc SobsCasGc;
+ CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
+ SobsCas.Initialize("sobs", 1u << 30, 4096, false);
+ GcContext SobsGcCtx;
+ SobsCas.CollectGarbage(SobsGcCtx);
+ CHECK(SobsCas.StorageSize().DiskSize == SObsSize);
+}
+
#endif
void