aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--zenstore/chunkbundler.cpp689
1 files changed, 398 insertions, 291 deletions
diff --git a/zenstore/chunkbundler.cpp b/zenstore/chunkbundler.cpp
index 2c1924303..5b9242eec 100644
--- a/zenstore/chunkbundler.cpp
+++ b/zenstore/chunkbundler.cpp
@@ -26,18 +26,6 @@ AlignPositon(uint64_t Offset, uint64_t Alignment)
return (Offset + Alignment - 1) & ~(Alignment - 1);
}
-template<typename T>
-class reverse
-{
-private:
- T& iterable_;
-
-public:
- explicit reverse(T& iterable) : iterable_{iterable} {}
- auto begin() const { return std::rbegin(iterable_); }
- auto end() const { return std::rend(iterable_); }
-};
-
ChunkBundler::ChunkBundler(std::filesystem::path RootDirectory, ChunkBundlerValidator* Validator)
: m_Validator(Validator)
, m_Log(logging::Get("chunkbundler"))
@@ -63,51 +51,7 @@ ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxB
std::filesystem::path SobsPath = m_RootDirectory / (m_ContainerBaseName + ".ucas");
std::filesystem::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog");
- CreateDirectories(m_RootDirectory / "ucas");
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_RootDirectory / "ucas"))
- {
- if (Entry.is_regular_file())
- {
- if (IsNewStore)
- {
- std::filesystem::remove(Entry.path());
- continue;
- }
- if (Entry.path().extension() == ".ucas")
- {
- try
- {
- std::string stem = Entry.path().stem().string();
- uint16_t fileIndex = static_cast<uint16_t>(std::stoi(stem));
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(Entry.path(), false);
- m_OpenBlocks[fileIndex] = SmallObjectFile;
- m_CurrentFileIndex = std::max<uint16_t>(m_CurrentFileIndex, fileIndex);
- }
- catch(const std::invalid_argument&)
- {
- // Non-valid file, skip it
- }
- }
- }
- }
- if (m_OpenBlocks.empty())
- {
- std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas");
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(path, true);
- m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile;
- m_CurrentInsertOffset = 0;
- }
-
- m_CurrentBlock = m_OpenBlocks[m_CurrentFileIndex];
- m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment));
- m_OpLog.Open(SlogPath, IsNewStore);
-
- // TODO: should validate integrity of container files here
-
- m_CurrentInsertOffset = 0;
- m_TotalSize = 0;
+ m_TotalSize = 0;
m_LocationMap.clear();
@@ -127,6 +71,7 @@ ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxB
SmallObjectIndex.Close();
}
+ m_OpLog.Open(SlogPath, IsNewStore);
m_OpLog.Replay([&](const CompactDiskIndexEntry& Record) {
if (Record.Flags & CompactDiskIndexEntry::kTombstone)
{
@@ -138,12 +83,71 @@ ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxB
}
});
+ std::unordered_set<uint16_t> ReferencedBlockIndexes;
for (const auto& Entry : m_LocationMap)
{
const auto& Location = Entry.second;
m_TotalSize.fetch_add(Location.Size);
+ ReferencedBlockIndexes.insert(Location.BlockIndex);
+ }
+
+ uint32_t SmallestBlockSize = 0xffffffffu;
+ CreateDirectories(m_RootDirectory / "ucas");
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_RootDirectory / "ucas"))
+ {
+ if (Entry.is_regular_file())
+ {
+ if (IsNewStore)
+ {
+ std::filesystem::remove(Entry.path());
+ continue;
+ }
+ if (Entry.path().extension() == ".ucas")
+ {
+ try
+ {
+ std::string FileName = Entry.path().stem().string();
+ uint16_t BlockIndex = static_cast<uint16_t>(std::stoi(FileName));
+ if (!ReferencedBlockIndexes.contains(BlockIndex))
+ {
+ // Clear out unused blocks
+ std::filesystem::remove(Entry.path());
+ continue;
+ }
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(Entry.path(), false);
+ m_OpenBlocks[BlockIndex] = SmallObjectFile;
+ if (SmallObjectFile->FileSize() < SmallestBlockSize)
+ {
+ m_CurrentBlockIndex = BlockIndex;
+ SmallestBlockSize = gsl::narrow<std::uint32_t>(SmallObjectFile->FileSize());
+ }
+ }
+ catch (const std::invalid_argument&)
+ {
+ // Non-valid file, skip it (or should we remove it?)
+ }
+ }
+ }
+ }
+ if (m_OpenBlocks.empty())
+ {
+ std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas");
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(path, true);
+ m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
+ m_CurrentBlock = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
+
+ }
+ else
+ {
+ m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex];
+ m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment));
}
+ // TODO: should validate integrity of container files here
+
m_IsInitialized = true;
}
@@ -168,19 +172,28 @@ ChunkBundler::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash&
if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize)
{
RwLock::ExclusiveLockScope __(m_LocationMapLock);
- m_CurrentFileIndex++;
- std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas");
+ uint16_t NewBlockIndex = m_CurrentBlockIndex + 1;
+ while (m_OpenBlocks.contains(NewBlockIndex))
+ {
+ NewBlockIndex++;
+ if (NewBlockIndex == m_CurrentBlockIndex)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
+ }
+ }
+ m_CurrentBlockIndex = NewBlockIndex;
+ std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas");
auto SmallObjectFile = std::make_shared<BasicFile>();
SmallObjectFile->Open(path, true);
- m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile;
- m_CurrentBlock = SmallObjectFile;
- m_CurrentInsertOffset = 0;
+ m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
+ m_CurrentBlock = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
}
const uint32_t InsertOffset = m_CurrentInsertOffset;
m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset);
m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment));
- const CompactDiskLocation Location{m_CurrentFileIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)};
+ const CompactDiskLocation Location{m_CurrentBlockIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)};
CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
RwLock::ExclusiveLockScope __(m_LocationMapLock);
@@ -249,7 +262,7 @@ ChunkBundler::Flush()
void
ChunkBundler::Scrub(ScrubContext& Ctx)
{
- const uint64_t WindowSize = 4 * 1024 * 1024;
+ const uint64_t WindowSize = 4 * 1024 * 1024;
std::vector<CompactDiskIndexEntry> BigChunks;
std::vector<CompactDiskIndexEntry> BadChunks;
@@ -267,8 +280,8 @@ ChunkBundler::Scrub(ScrubContext& Ctx)
for (const auto& Block : m_OpenBlocks)
{
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
+ uint64_t WindowStart = 0;
+ uint64_t WindowEnd = WindowSize;
auto& SmallObjectFile = *Block.second;
const uint64_t FileSize = SmallObjectFile.FileSize();
@@ -292,11 +305,11 @@ ChunkBundler::Scrub(ScrubContext& Ctx)
continue;
}
- if (m_Validator && !m_Validator->ValidateChunk(
- IoBuffer(IoBuffer::Wrap,
- reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart,
- Entry.second.Size),
- Entry.first))
+ if (m_Validator &&
+ !m_Validator->ValidateChunk(IoBuffer(IoBuffer::Wrap,
+ reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart,
+ Entry.second.Size),
+ Entry.first))
{
// Hash mismatch
BadChunks.push_back({.Key = Entry.first, .Location = Entry.second});
@@ -327,7 +340,7 @@ ChunkBundler::Scrub(ScrubContext& Ctx)
}
*/
}
-#endif // 0
+#endif // 0
}
if (BadChunks.empty())
@@ -361,31 +374,40 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
{
namespace fs = std::filesystem;
- // Garbage collection will first remove any chunks that are flushed from the index.
- // It then tries to compact the existing small object file, it does this by
- // collecting all chunks that should be kept and sort them in position order.
- // It then steps from chunk to chunk and checks if there is space to move the last
- // chunk before the current chunk. It repeats this until it can't fit the last chunk
- // or the last chunk is the current chunk.
- // After this it check to see if there is space to move the current chunk closer to
- // the preceeding chunk (or beginning of file if there is no preceeding chunk).
- // It updates the new write position for any new chunks and rewrites the cas log
- // to match the new content of the store.
+ // It collects all the blocks that we want to delete chunks from. For each such
+ // block we keep a list of chunks to retain.
+ //
+ // It will first remove any chunks that are flushed from the m_LocationMap.
+ //
+ // It then checks to see if we want to purge any chunks that are in the currently
+ // active block. If so, we break off the current block and start on a new block,
+ // otherwise we just let the active block be.
//
- // It currently grabs a full lock during the GC operation but the compacting is
- // done gradually and can be stopped after each chunk if the GC operation needs to
- // be time limited. This will leave holes in the small object file that will not
- // be reclaimed unless a GC operation is executed again, but the state of the
- // cas store is intact.
+ // Next it will iterate over all blocks that we want to remove chunks from.
+ // If the block is empty after removal of chunks we mark the block as pending
+ // delete - we want to delete it as soon as there are no IoBuffers using the
+ // block file.
//
- // It is also possible to more fine-grained locking of GC operation when moving
- // blocks but that requires more work and additional checking if new blocks are
- // added betwen each move of a block.
+ // If the block is non-empty we write out the chunks we want to keep to a new
+ // block file (creating new block files as needed).
+ //
+ // We update the index as we complete each new block file. This makes it possible
+ // to break the GC if we want to limit time for execution.
+ //
+ // GC can fairly parallell to regular operation - it will block while figuring
+ // out which chunks to remove and what blocks to rewrite but the actual
+ // reading and writing of data to new block files does not block regular operation.
+ //
+ // While moving blocks it will do a blocking operation and update the m_LocationMap
+ // after each new block is written and it will also block when figuring out the
+ // path to the next new block.
+
ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);
- std::unordered_map<uint16_t, std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher>> KeepChunksPerBlock;
- std::vector<IoHash> DeletedChunks;
- std::unordered_set<uint16_t> BlocksToReWrite;
+ std::unordered_map<uint64_t, size_t> BlockIndexToKeepChunksMap;
+ std::vector<std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher>> KeepChunks;
+ std::vector<IoHash> DeletedChunks;
+ std::unordered_set<uint16_t> BlocksToReWrite;
{
RwLock::ExclusiveLockScope _i(m_InsertLock);
RwLock::ExclusiveLockScope _l(m_LocationMapLock);
@@ -409,12 +431,15 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
for (const auto& Entry : m_LocationMap)
{
TotalChunkHashes.push_back(Entry.first);
+ if (BlockIndexToKeepChunksMap.contains(Entry.second.BlockIndex))
+ {
+ continue;
+ }
+ BlockIndexToKeepChunksMap[Entry.second.BlockIndex] = KeepChunks.size();
+ KeepChunks.resize(KeepChunks.size() + 1);
}
- //std::vector<IoHash> ChunkHashes; // Same sort order as ChunkLocations
- //ChunkHashes.reserve(m_LocationMap.size());
-
- const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
uint64_t NewTotalSize = 0;
GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
@@ -422,7 +447,8 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
{
auto KeyIt = m_LocationMap.find(ChunkHash);
const auto& ChunkLocation = KeyIt->second;
- KeepChunksPerBlock[ChunkLocation.BlockIndex][ChunkHash] = ChunkLocation;
+ auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[ChunkLocation.BlockIndex]];
+ ChunkMap[ChunkHash] = ChunkLocation;
NewTotalSize += ChunkLocation.Size;
}
else
@@ -430,36 +456,8 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
DeletedChunks.push_back(ChunkHash);
}
});
- /*
- if (ChunkHashes.size() == TotalChunkCount)
- {
- ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete",
- TotalChunkCount,
- NiceBytes(TotalSize),
- m_RootDirectory / m_ContainerBaseName);
- return;
- }
-
- const uint64_t ChunkCount = ChunkHashes.size();
-
- std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
- auto LhsKeyIt = m_LocationMap.find(Lhs);
- auto RhsKeyIt = m_LocationMap.find(Rhs);
- return LhsKeyIt->second.Offset < RhsKeyIt->second.Offset;
- });
- uint64_t NewTotalSize = 0;
- std::vector<CompactDiskLocation> ChunkLocations;
- ChunkLocations.reserve(ChunkHashes.size());
- for (auto ChunkHash : ChunkHashes)
- {
- auto KeyIt = m_LocationMap.find(ChunkHash);
- const auto& ChunkLocation = KeyIt->second;
- ChunkLocations.push_back(ChunkLocation);
- NewTotalSize += ChunkLocation.Size;
- }
- */
- if (!CollectSmallObjects)
+ if (!PerformDelete)
{
ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
m_RootDirectory / m_ContainerBaseName,
@@ -480,174 +478,151 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.Size));
}
- if (BlocksToReWrite.contains(m_CurrentFileIndex))
- {
- m_CurrentFileIndex++;
- std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas");
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(path, true);
- m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile;
- m_CurrentBlock = SmallObjectFile;
- m_CurrentInsertOffset = 0;
- }
- }
-
- {
- // Rewrite all BlocksToReWrite
- for (auto BlockIndex : BlocksToReWrite)
+ if (BlocksToReWrite.contains(m_CurrentBlockIndex))
{
- std::shared_ptr<BasicFile> BlockFile;
- {
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- BlockFile = m_OpenBlocks[BlockIndex];
- }
- std::filesystem::path BlockPath = m_RootDirectory / "ucas" / (std::to_string(BlockIndex) + ".ucas");
- auto& KeepChunksForBlock = KeepChunksPerBlock[BlockIndex];
- if (KeepChunksForBlock.empty())
- {
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- BlockFile = m_OpenBlocks[BlockIndex];
- BlockFile->Close(); // TODO: We can't know that someone isn't holding a IoBuffer for this block at this point!
- m_OpenBlocks.erase(BlockIndex);
- fs::remove(BlockPath);
- }
- else
+ uint16_t NewBlockIndex = m_CurrentBlockIndex + 1;
+ while (m_OpenBlocks.contains(NewBlockIndex))
{
- std::filesystem::path TmpBlockPath = m_RootDirectory / "ucas" / (std::to_string(BlockIndex) + ".gc.ucas");
- auto TmpBlock = std::make_shared<BasicFile>();
- TmpBlock->Open(TmpBlockPath, true);
- std::vector<uint8_t> Chunk;
- uint64_t WriteOffset = 0;
-
- for (auto& Entry : KeepChunksForBlock)
- {
- const CompactDiskLocation& ChunkLocation = Entry.second;
- Chunk.resize(ChunkLocation.Size);
- BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
- CompactDiskLocation NewChunkLocation(ChunkLocation.BlockIndex,
- gsl::narrow<uint32_t>(WriteOffset),
- gsl::narrow<uint32_t>(Chunk.size()));
- TmpBlock->Write(Chunk.data(), Chunk.size(), NewChunkLocation.Offset);
- Entry.second = NewChunkLocation;
- WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment);
- }
- TmpBlock->Close();
-
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
- for (const auto& Entry : KeepChunksForBlock)
+ NewBlockIndex++;
+ if (NewBlockIndex == m_CurrentBlockIndex)
{
- m_LocationMap[Entry.first] = Entry.second;
- m_OpLog.Append({.Key = Entry.first, .Location = Entry.second});
+ ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
+ m_ContainerBaseName,
+ std::numeric_limits<uint16_t>::max() + 1);
+ return;
}
- BlockFile->Close(); // TODO: We can't know that someone isn't holding a IoBuffer for this block at this point!
- fs::remove(BlockPath);
- fs::rename(TmpBlockPath, BlockPath);
- BlockFile->Open(BlockPath, false);
}
+ m_CurrentBlockIndex = NewBlockIndex;
+ std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas");
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(path, true);
+ m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
+ m_CurrentBlock = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
}
+ }
-#if 0
- // We can break here if we only want to remove items without compacting of space
+ // Move all chunks in blocks that have chunks removed to new blocks
- std::vector<IoHash> MovedChunks;
+ std::shared_ptr<BasicFile> NewBlockFile;
+ uint64_t WriteOffset = {};
+ uint16_t NewBlockIndex = {};
+ std::unordered_map<IoHash, CompactDiskLocation> MovedBlocks;
- uint64_t WriteOffset{};
- uint64_t ChunkIndex{};
- while (ChunkIndex < ChunkHashes.size())
+ for (auto BlockIndex : BlocksToReWrite)
+ {
+ auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[BlockIndex]];
+ if (ChunkMap.empty())
{
- IoHash ChunkHash = ChunkHashes[ChunkIndex];
- const auto& ChunkLocation = ChunkLocations[ChunkIndex];
-
- uint64_t NextChunkOffset = AlignPositon(ChunkLocation.GetOffset() + ChunkLocation.GetSize(), m_PayloadAlignment);
+ // The block has no references to it, it should be removed as soon as no references is held on the file
+ // TODO: We currently don't know if someone is holding a IoBuffer for this block at this point!
+
+ //std::filesystem::path BlockPath = m_RootDirectory / "ucas" / (std::to_string(BlockIndex) + ".ucas");
+ //RwLock::ExclusiveLockScope _i(m_InsertLock);
+ //auto BlockFile = m_OpenBlocks[BlockIndex];
+ //m_OpenBlocks.erase(BlockIndex);
+ //BlockFile->Close();
+ //fs::remove(BlockPath);
+ continue;
+ }
- uint64_t FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset;
+ std::shared_ptr<BasicFile> BlockFile;
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ BlockFile = m_OpenBlocks[BlockIndex];
+ }
- // TODO: We could keep some wiggle room here, only try to find the last keep block if there is a reasonable amount of space free
- while (FreeChunkSize >= m_PayloadAlignment)
+ {
+ std::vector<uint8_t> Chunk;
+ for (auto& Entry : ChunkMap)
{
- // We should move as many keep chunk at the end as we can possibly fit
- uint64_t LastKeepChunkIndex = ChunkHashes.size() - 1;
- if (LastKeepChunkIndex == ChunkIndex)
- {
- break;
- }
+ const CompactDiskLocation& ChunkLocation = Entry.second;
+ Chunk.resize(ChunkLocation.Size);
+ BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
- IoHash LastChunkHash = ChunkHashes[LastKeepChunkIndex];
- const auto& LastChunkLocation = ChunkLocations[LastKeepChunkIndex];
- if (LastChunkLocation.GetSize() > FreeChunkSize)
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
{
- break;
- }
-
- // Move the last chunk to our write location
- std::vector<uint8_t> Chunk;
- Chunk.resize(LastChunkLocation.GetSize());
- m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), LastChunkLocation.GetOffset());
- CompactDiskLocation NewChunkLocation(WriteOffset, Chunk.size());
- m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset());
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ if (NewBlockFile)
+ {
+ m_OpenBlocks[NewBlockIndex] = NewBlockFile;
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
+ for (const auto& MovedEntry : MovedBlocks)
+ {
+ m_LocationMap[MovedEntry.first] = MovedEntry.second;
+ m_OpLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
+ }
+ }
+ NewBlockIndex = m_CurrentBlockIndex + 1;
+ while (m_OpenBlocks.contains(NewBlockIndex))
+ {
+ NewBlockIndex++;
+ if (NewBlockIndex == m_CurrentBlockIndex)
+ {
+ ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", m_ContainerBaseName, std::numeric_limits<uint16_t>::max()+1);
+ return;
+ }
+ }
+ m_OpenBlocks[NewBlockIndex] = std::shared_ptr<BasicFile>(); // Make sure nobody steals this slot
+ }
- CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation};
- m_OpLog.Append(IndexEntry);
- m_LocationMap[LastChunkHash] = NewChunkLocation;
- ChunkHashes.pop_back();
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_RootDirectory, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message());
+ return;
+ }
- WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment);
- FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset;
- MovedChunks.push_back(LastChunkHash);
+ if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space
+ {
+ ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}",
+ m_RootDirectory / m_ContainerBaseName,
+ m_MaxBlockSize * m_MaxBlockSize,
+ NiceBytes(Space.Free));
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ m_OpenBlocks.erase(NewBlockIndex);
+ return;
+ }
- uint64_t LastChunkNextChunkOffset = AlignPositon(LastChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment);
- if (m_CurrentInsertOffset == LastChunkNextChunkOffset)
- {
- m_CurrentInsertOffset = LastChunkLocation.GetOffset();
+ std::filesystem::path NewBlockPath = m_RootDirectory / "ucas" / (std::to_string(NewBlockIndex) + ".ucas");
+ NewBlockFile = std::make_shared<BasicFile>();
+ NewBlockFile->Open(NewBlockPath, true);
+ MovedBlocks.clear();
+ WriteOffset = 0;
}
- }
- // TODO: We could keep some wiggle room here, don't move chunk if we only move it a very small amount
- if (FreeChunkSize > m_PayloadAlignment)
- {
- std::vector<uint8_t> Chunk;
- Chunk.resize(ChunkLocation.GetSize());
- m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), ChunkLocation.GetOffset());
- CompactDiskLocation NewChunkLocation(WriteOffset, Chunk.size());
- m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset());
-
- CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation};
- m_OpLog.Append(IndexEntry);
- m_LocationMap[ChunkHash] = NewChunkLocation;
-
- MovedChunks.push_back(ChunkHash);
- WriteOffset = AlignPositon(NewChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment);
- }
- else
- {
- WriteOffset = NextChunkOffset;
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ CompactDiskLocation NewChunkLocation(NewBlockIndex,
+ gsl::narrow<uint32_t>(WriteOffset),
+ gsl::narrow<uint32_t>(Chunk.size()));
+ Entry.second = {.BlockIndex = NewBlockIndex,
+ .Offset = gsl::narrow<uint32_t>(WriteOffset),
+ .Size = gsl::narrow<uint32_t>(Chunk.size())};
+ MovedBlocks[Entry.first] = Entry.second;
+ WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment);
}
+ Chunk.clear();
- // Update insert location if this is the last chunk in the file
- if (m_CurrentInsertOffset == NextChunkOffset)
+ // Remap moved chunks to the new block file
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ if (NewBlockFile)
{
- m_CurrentInsertOffset = WriteOffset;
+ m_OpenBlocks[NewBlockIndex] = NewBlockFile;
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
+ for (const auto& MovedEntry : MovedBlocks)
+ {
+ m_LocationMap[MovedEntry.first] = MovedEntry.second;
+ m_OpLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
+ }
}
-
- // We can break here if we want to do incremental GC
-
- ChunkIndex++;
- }
-
- if (ChunkCount == 0)
- {
- m_CurrentInsertOffset = 0;
}
-#endif // 0
+ }
- GcCtx.DeletedCas(DeletedChunks);
+ GcCtx.DeletedCas(DeletedChunks);
- ZEN_INFO("garbage collection complete '{}', deleted {} chunks",
- m_RootDirectory / m_ContainerBaseName,
- DeletedChunks.size());
- // TODO: Should we truncate the file or just keep the size of the file and reuse the space?
- }
+ ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_RootDirectory / m_ContainerBaseName, DeletedChunks.size());
MakeIndexSnapshot();
}
@@ -931,29 +906,22 @@ TEST_CASE("chunkbundler.gc.compact")
ScopedTemporaryDirectory TempDir;
ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
- ChunkBundlerStore.Initialize("cb", 65536, 1 << 4, true);
+ ChunkBundlerStore.Initialize("cb", 2048, 1 << 4, true);
uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
- IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]),
- CreateChunk(ChunkSizes[1]),
- CreateChunk(ChunkSizes[2]),
- CreateChunk(ChunkSizes[3]),
- CreateChunk(ChunkSizes[4]),
- CreateChunk(ChunkSizes[5]),
- CreateChunk(ChunkSizes[6]),
- CreateChunk(ChunkSizes[7]),
- CreateChunk(ChunkSizes[8])};
- IoHash ChunkHashes[9] = {
- IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()),
- IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()),
- IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()),
- IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()),
- IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()),
- IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()),
- IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()),
- IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()),
- IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()),
- };
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(9);
+ for (const auto& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(9);
+ for (const auto& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
CHECK(ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]).New);
CHECK(ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]).New);
@@ -1110,6 +1078,42 @@ TEST_CASE("chunkbundler.gc.compact")
ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]);
}
+ // Keep every other
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[2]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ ChunkBundlerStore.CollectGarbage(GcCtx);
+
+ CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0]));
+ CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1]));
+ CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[2]));
+ CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3]));
+ CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4]));
+ CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5]));
+ CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6]));
+ CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7]));
+ CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8])));
+
+ ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]);
+ ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]);
+ ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]);
+ ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
// Verify that we nicely appended blocks even after all GC operations
CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0])));
CHECK(ChunkHashes[1] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[1])));
@@ -1125,8 +1129,111 @@ TEST_CASE("chunkbundler.gc.compact")
CHECK(InitialSize == FinalSize);
}
-void
-chunkbundler_forcelink()
+TEST_CASE("chunkbundler.gc.deleteblockonopen")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (const auto& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const auto& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ {
+ ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
+ ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(ChunkBundlerStore.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ // GC every other block
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcCtx.ContributeCas(KeepChunks);
+
+ ChunkBundlerStore.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[i]));
+ CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i+1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[i])));
+ }
+ }
+ }
+ {
+ // Re-open
+ ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
+ ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, false);
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[i]));
+ CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[i])));
+ }
+ }
+}
+
+TEST_CASE("chunkbundler.gc.handleopeniobuffer")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (const auto& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const auto& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
+ ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(ChunkBundlerStore.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ auto RetainChunk = ChunkBundlerStore.FindChunk(ChunkHashes[5]);
+
+ // GC everything
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ ChunkBundlerStore.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i]));
+ }
+
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
+}
+
+void chunkbundler_forcelink()
{
}