aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-30 01:29:24 +0200
committerDan Engelbrecht <[email protected]>2022-04-30 09:41:24 +0200
commitde6057de814a4dc16654bdda84f697476b2ebef5 (patch)
tree7f5fac98b79a68e98b3effc91162379438682c8d
parentMerge remote-tracking branch 'origin/main' into de/cache-with-block-store (diff)
downloadzen-de6057de814a4dc16654bdda84f697476b2ebef5.tar.xz
zen-de6057de814a4dc16654bdda84f697476b2ebef5.zip
first pass at generic block store with gc
-rw-r--r--zenstore/blockstore.cpp508
-rw-r--r--zenstore/compactcas.cpp538
-rw-r--r--zenstore/compactcas.h7
-rw-r--r--zenstore/include/zenstore/blockstore.h41
4 files changed, 634 insertions, 460 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index 1eb859d5a..a897ed902 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -2,6 +2,9 @@
#include "compactcas.h"
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zenstore/blockstore.h>
#if ZEN_WITH_TESTS
@@ -108,6 +111,511 @@ BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::functio
m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
}
+namespace {
+ const char* DataExtension = ".ucas";
+
+ std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
+ {
+ ExtendablePathBuilder<256> Path;
+
+ char BlockHexString[9];
+ ToHexNumber(BlockIndex, BlockHexString);
+
+ Path.Append(BlocksBasePath);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
+ Path.AppendSeparator();
+ Path.Append(BlockHexString);
+ Path.Append(DataExtension);
+ return Path.ToPath();
+ }
+} // namespace
+
+void
+BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations)
+{
+ ZEN_ASSERT(MaxBlockSize > 0);
+ ZEN_ASSERT(MaxBlockCount > 0);
+ ZEN_ASSERT(IsPow2(MaxBlockCount));
+
+ m_BlocksBasePath = BlocksBasePath;
+ m_MaxBlockSize = MaxBlockSize;
+
+ m_TotalSize = 0;
+ m_ChunkBlocks.clear();
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : KnownLocations)
+ {
+ m_TotalSize.fetch_add(Entry.Size, std::memory_order_seq_cst);
+ KnownBlocks.insert(Entry.BlockIndex);
+ }
+
+ if (std::filesystem::is_directory(m_BlocksBasePath))
+ {
+ std::vector<std::filesystem::path> FoldersToScan;
+ FoldersToScan.push_back(m_BlocksBasePath);
+ size_t FolderOffset = 0;
+ while (FolderOffset < FoldersToScan.size())
+ {
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
+ {
+ if (Entry.is_directory())
+ {
+ FoldersToScan.push_back(Entry.path());
+ continue;
+ }
+ if (Entry.is_regular_file())
+ {
+ const std::filesystem::path Path = Entry.path();
+ if (Path.extension() != DataExtension)
+ {
+ continue;
+ }
+ std::string FileName = Path.stem().string();
+ uint32_t BlockIndex;
+ bool OK = ParseHexNumber(FileName, BlockIndex);
+ if (!OK)
+ {
+ continue;
+ }
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ // Log removing unreferenced block
+ // Clear out unused blocks
+ ZEN_INFO("removing unused block for '{}' at '{}'", m_BlocksBasePath, Path);
+ std::error_code Ec;
+ std::filesystem::remove(Path, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
+ }
+ continue;
+ }
+ Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
+ BlockFile->Open();
+ m_ChunkBlocks[BlockIndex] = BlockFile;
+ }
+ }
+ ++FolderOffset;
+ }
+ }
+ else
+ {
+ CreateDirectories(m_BlocksBasePath);
+ }
+}
+
+BlockStoreLocation
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
+{
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ bool IsWriting = m_WriteBlock != nullptr;
+ if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize)
+ {
+ if (m_WriteBlock)
+ {
+ m_WriteBlock = nullptr;
+ }
+ {
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
+ }
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ m_WriteBlock = new BlockStoreFile(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ }
+ m_CurrentInsertOffset = 0;
+ m_WriteBlock->Create(m_MaxBlockSize);
+ }
+ uint64_t InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ InsertLock.ReleaseNow();
+
+ BlockStoreLocation Location{.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size};
+ WriteBlock->Write(Data, Size, InsertOffset);
+
+ return Location;
+}
+
+/*
+IoBuffer
+BlockStore::ReadChunk(const BlockStoreLocation& Location)
+{
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ Ref<BlockStoreFile> ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
+ InsertLock.ReleaseNow();
+ return ChunkBlock->GetChunk(Location.Offset, Location.Size);
+}
+*/
+
+Ref<BlockStoreFile>
+BlockStore::GetChunkBlock(const BlockStoreLocation& Location)
+{
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ return m_ChunkBlocks[Location.BlockIndex];
+}
+
+void
+BlockStore::Flush()
+{
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
+ {
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+}
+
+// TODO: Almost there - some bug remain and API might need tweaking
+void
+BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::vector<size_t>& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& Callback)
+{
+ if (ChunkLocations.empty())
+ {
+ return;
+ }
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = ChunkLocations.size();
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ uint64_t MovedCount = 0;
+ uint64_t DeletedCount = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([this,
+ &TotalTimer,
+ &WriteBlockTimeUs,
+ &WriteBlockLongestTimeUs,
+ &ReadBlockTimeUs,
+ &ReadBlockLongestTimeUs,
+ &TotalChunkCount,
+ &DeletedCount,
+ &MovedCount,
+ &DeletedSize,
+ OldTotalSize] {
+ ZEN_INFO(
+ "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "chunks ({}).",
+ m_BlocksBasePath,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ });
+
+ size_t BlockCount = 0;
+ uint64_t ExcludeBlockIndex = 0x800000000ull;
+ {
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ if (m_WriteBlock)
+ {
+ ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ }
+ BlockCount = m_ChunkBlocks.size();
+ }
+
+ std::unordered_map<size_t, BlockStoreLocation> LocationLookup;
+ LocationLookup.reserve(TotalChunkCount);
+
+ std::unordered_set<size_t> KeepChunkMap;
+ KeepChunkMap.reserve(KeepChunkIndexes.size());
+ for (size_t KeepChunkIndex : KeepChunkIndexes)
+ {
+ const BlockStoreLocation& Location = ChunkLocations[KeepChunkIndex];
+ if (Location.BlockIndex == ExcludeBlockIndex)
+ {
+ continue;
+ }
+ KeepChunkMap.insert(KeepChunkIndex);
+ }
+ std::unordered_set<size_t> DeleteChunkMap;
+ DeleteChunkMap.reserve(ChunkLocations.size() - KeepChunkIndexes.size());
+
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<std::vector<size_t>> KeepChunks;
+ std::vector<std::vector<size_t>> DeleteChunks;
+
+ BlockIndexToChunkMapIndex.reserve(BlockCount);
+ KeepChunks.reserve(BlockCount);
+ DeleteChunks.reserve(BlockCount);
+ size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
+
+ size_t DeleteCount = 0;
+ uint64_t NewTotalSize = 0;
+ for (size_t Index = 0; Index < TotalChunkCount; ++Index)
+ {
+ const BlockStoreLocation& Location = ChunkLocations[Index];
+ LocationLookup[Index] = Location;
+ if (Location.BlockIndex == ExcludeBlockIndex)
+ {
+ continue;
+ }
+
+ auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
+ size_t ChunkMapIndex = 0;
+ if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
+ {
+ ChunkMapIndex = KeepChunks.size();
+ BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
+ KeepChunks.resize(ChunkMapIndex + 1);
+ KeepChunks.back().reserve(GuesstimateCountPerBlock);
+ DeleteChunks.resize(ChunkMapIndex + 1);
+ DeleteChunks.back().reserve(GuesstimateCountPerBlock);
+ }
+ else
+ {
+ ChunkMapIndex = BlockIndexPtr->second;
+ }
+
+ if (KeepChunkMap.contains(Index))
+ {
+ std::vector<size_t>& IndexMap = KeepChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ NewTotalSize += Location.Size;
+ continue;
+ }
+ std::vector<size_t>& IndexMap = DeleteChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ DeleteCount++;
+ }
+
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
+ for (const auto& Entry : BlockIndexToChunkMapIndex)
+ {
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const std::vector<size_t>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ if (ChunkMap.empty())
+ {
+ continue;
+ }
+ BlocksToReWrite.insert(BlockIndex);
+ }
+
+ if (DryRun)
+ {
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_BlocksBasePath,
+ DeleteCount,
+ NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
+ return;
+ }
+
+ std::unordered_map<size_t, BlockStoreLocation> MovedChunks;
+ std::vector<size_t> RemovedChunks;
+
+ Ref<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
+
+ for (uint32_t BlockIndex : BlocksToReWrite)
+ {
+ const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
+
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _i(m_InsertLock);
+ OldBlockFile = m_ChunkBlocks[BlockIndex];
+ ZEN_ASSERT(OldBlockFile);
+ }
+
+ const std::vector<size_t>& KeepMap = KeepChunks[ChunkMapIndex];
+ if (KeepMap.empty())
+ {
+ const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ RemovedChunks.push_back(DeleteIndex);
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ DeletedCount++;
+ }
+ Callback(MovedChunks, RemovedChunks);
+ MovedChunks.clear();
+ RemovedChunks.clear();
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_BlocksBasePath, BlockIndex, OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ continue;
+ }
+
+ std::vector<uint8_t> Chunk;
+ for (const size_t& ChunkIndex : KeepMap)
+ {
+ const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
+ {
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ }
+ {
+ Callback(MovedChunks, RemovedChunks);
+ MovedChunks.clear();
+ RemovedChunks.clear();
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BlocksBasePath,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+ while (m_ChunkBlocks.contains(NextBlockIndex))
+ {
+ NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
+ return;
+ }
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = 0; // GcCtx.ClaimGCReserve();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ m_ChunkBlocks.erase(NextBlockIndex);
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BlocksBasePath,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ }
+
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedChunks[ChunkIndex] = {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()};
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ MovedCount++;
+ }
+ Chunk.clear();
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = {};
+ }
+
+ const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ RemovedChunks.push_back(DeleteIndex);
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ DeletedCount++;
+ }
+
+ Callback(MovedChunks, RemovedChunks);
+ MovedChunks.clear();
+ RemovedChunks.clear();
+ {
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_BlocksBasePath, BlockIndex, OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ OldBlockFile = nullptr;
+ }
+
+ return;
+}
+
#if ZEN_WITH_TESTS
static bool
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 920ed965f..2b48eb143 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -263,53 +263,12 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3
CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
- uint32_t WriteBlockIndex;
- Ref<BlockStoreFile> WriteBlock;
- uint64_t InsertOffset;
{
- RwLock::ExclusiveLockScope _(m_InsertLock);
-
- {
- RwLock::SharedLockScope __(m_LocationMapLock);
- if (m_LocationMap.contains(ChunkHash))
- {
- return CasStore::InsertResult{.New = false};
- }
- }
-
- // New entry
-
- WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- bool IsWriting = m_WriteBlock != nullptr;
- if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ if (m_LocationMap.contains(ChunkHash))
{
- if (m_WriteBlock)
- {
- m_WriteBlock = nullptr;
- }
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
- {
- throw std::runtime_error(
- fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName));
- }
- WriteBlockIndex += IsWriting ? 1 : 0;
- while (m_ChunkBlocks.contains(WriteBlockIndex))
- {
- WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- }
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- m_WriteBlock = new BlockStoreFile(BlockPath);
- m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
- }
- m_CurrentInsertOffset = 0;
- m_WriteBlock->Create(m_MaxBlockSize);
+ return CasStore::InsertResult{.New = false};
}
- InsertOffset = m_CurrentInsertOffset;
- m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment);
- WriteBlock = m_WriteBlock;
}
// We can end up in a situation that InsertChunk writes the same chunk data in
@@ -324,17 +283,15 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// This should be a rare occasion and the current flow reduces the time we block for
// reads, insert and GC.
- BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
-
- WriteBlock->Write(ChunkData, ChunkSize, InsertOffset);
+ BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment);
+ BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
m_CasLog.Append(IndexEntry);
-
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
{
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, Location);
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, DiskLocation);
}
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
return CasStore::InsertResult{.New = true};
}
@@ -348,20 +305,16 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
- Ref<BlockStoreFile> ChunkBlock;
- BlockStoreLocation Location;
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
{
- RwLock::SharedLockScope _(m_LocationMapLock);
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
- {
- Location = KeyIt->second.Get(m_PayloadAlignment);
- ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
- }
- else
- {
- return IoBuffer();
- }
+ return IoBuffer();
}
+ BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment);
+ Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex];
+ _.ReleaseNow();
+
return ChunkBlock->GetChunk(Location.Offset, Location.Size);
}
@@ -388,7 +341,8 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
CasContainerStrategy::Flush()
{
- {
+ m_BlockStore.Flush();
+ /* {
RwLock::ExclusiveLockScope _(m_InsertLock);
if (m_CurrentInsertOffset > 0)
{
@@ -398,13 +352,15 @@ CasContainerStrategy::Flush()
m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
m_CurrentInsertOffset = 0;
}
- }
+ }*/
MakeIndexSnapshot();
}
void
CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
+ ZEN_UNUSED(Ctx);
+#if 0
std::vector<CasDiskIndexEntry> BadChunks;
// We do a read sweep through the payloads file and validate
@@ -508,118 +464,31 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
-
Ctx.ReportBadCasChunks(BadChunkHashes);
+#endif // 0
}
void
CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
- // It collects all the blocks that we want to delete chunks from. For each such
- // block we keep a list of chunks to retain and a list of chunks to delete.
- //
- // If there is a block that we are currently writing to, that block is omitted
- // from the garbage collection.
- //
- // Next it will iterate over all blocks that we want to remove chunks from.
- // If the block is empty after removal of chunks we mark the block as pending
- // delete - we want to delete it as soon as there are no IoBuffers using the
- // block file.
- // Once complete we update the m_LocationMap by removing the chunks.
- //
- // If the block is non-empty we write out the chunks we want to keep to a new
- // block file (creating new block files as needed).
- //
- // We update the index as we complete each new block file. This makes it possible
- // to break the GC if we want to limit time for execution.
- //
- // GC can fairly parallell to regular operation - it will block while taking
- // a snapshot of the current m_LocationMap state.
- //
- // While moving blocks it will do a blocking operation and update the m_LocationMap
- // after each new block is written and figuring out the path to the next new block.
-
- ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
uint64_t WriteBlockTimeUs = 0;
uint64_t WriteBlockLongestTimeUs = 0;
uint64_t ReadBlockTimeUs = 0;
uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
-
- std::vector<IoHash> DeletedChunks;
- uint64_t MovedCount = 0;
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([this,
- &TotalTimer,
- &WriteBlockTimeUs,
- &WriteBlockLongestTimeUs,
- &ReadBlockTimeUs,
- &ReadBlockLongestTimeUs,
- &TotalChunkCount,
- &DeletedChunks,
- &MovedCount,
- &DeletedSize,
- OldTotalSize] {
- ZEN_INFO(
- "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
- "#{} "
- "of #{} "
- "chunks ({}).",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedChunks.size(),
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize));
- });
LocationMap_t LocationMap;
- size_t BlockCount;
- uint64_t ExcludeBlockIndex = 0x800000000ull;
{
- RwLock::SharedLockScope __(m_InsertLock);
RwLock::SharedLockScope ___(m_LocationMapLock);
- {
- Stopwatch Timer;
- const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_WriteBlock)
- {
- ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- }
- __.ReleaseNow();
- }
- LocationMap = m_LocationMap;
- BlockCount = m_ChunkBlocks.size();
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ LocationMap = m_LocationMap;
}
- if (LocationMap.empty())
- {
- ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName);
- return;
- }
-
- TotalChunkCount = LocationMap.size();
-
- std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
- std::vector<std::vector<IoHash>> KeepChunks;
- std::vector<std::vector<IoHash>> DeleteChunks;
-
- BlockIndexToChunkMapIndex.reserve(BlockCount);
- KeepChunks.reserve(BlockCount);
- DeleteChunks.reserve(BlockCount);
- size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
+ uint64_t TotalChunkCount = LocationMap.size();
std::vector<IoHash> TotalChunkHashes;
TotalChunkHashes.reserve(TotalChunkCount);
@@ -628,272 +497,83 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
TotalChunkHashes.push_back(Entry.first);
}
- uint64_t DeleteCount = 0;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<size_t> KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
- uint64_t NewTotalSize = 0;
GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- auto KeyIt = LocationMap.find(ChunkHash);
- const BlockStoreDiskLocation& Location = KeyIt->second;
- uint32_t BlockIndex = Location.GetBlockIndex();
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreDiskLocation& DiskLocation = KeyIt->second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
- if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex)
- {
- return;
- }
-
- auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex);
- size_t ChunkMapIndex = 0;
- if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
- {
- ChunkMapIndex = KeepChunks.size();
- BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex;
- KeepChunks.resize(ChunkMapIndex + 1);
- KeepChunks.back().reserve(GuesstimateCountPerBlock);
- DeleteChunks.resize(ChunkMapIndex + 1);
- DeleteChunks.back().reserve(GuesstimateCountPerBlock);
- }
- else
- {
- ChunkMapIndex = BlockIndexPtr->second;
- }
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
if (Keep)
{
- std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex];
- ChunkMap.push_back(ChunkHash);
- NewTotalSize += Location.GetSize();
- }
- else
- {
- std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
- ChunkMap.push_back(ChunkHash);
- DeleteCount++;
+ KeepChunkIndexes.push_back(ChunkIndex);
}
});
- std::unordered_set<uint32_t> BlocksToReWrite;
- BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
- for (const auto& Entry : BlockIndexToChunkMapIndex)
- {
- uint32_t BlockIndex = Entry.first;
- size_t ChunkMapIndex = Entry.second;
- const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
- if (ChunkMap.empty())
- {
- continue;
- }
- BlocksToReWrite.insert(BlockIndex);
- }
-
const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
if (!PerformDelete)
{
- uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
- ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- DeleteCount,
- NiceBytes(TotalSize - NewTotalSize),
- TotalChunkCount,
- NiceBytes(TotalSize));
+ m_BlockStore.ReclaimSpace(ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
return;
}
-
- // Move all chunks in blocks that have chunks removed to new blocks
-
- Ref<BlockStoreFile> NewBlockFile;
- uint64_t WriteOffset = 0;
- uint32_t NewBlockIndex = 0;
- DeletedChunks.reserve(DeleteCount);
-
- auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) {
- for (const CasDiskIndexEntry& Entry : Entries)
- {
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ std::vector<IoHash> DeletedChunks;
+ m_BlockStore.ReclaimSpace(
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_PayloadAlignment,
+ false,
+ [this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs](
+ const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks,
+ const std::vector<size_t> RemovedChunks) {
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- auto KeyIt = m_LocationMap.find(Entry.Key);
- uint64_t ChunkSize = KeyIt->second.GetSize();
- m_TotalSize.fetch_sub(ChunkSize);
- m_LocationMap.erase(KeyIt);
- continue;
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash];
+ LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone});
+ DeletedChunks.push_back(ChunkHash);
}
- m_LocationMap[Entry.Key] = Entry.Location;
- }
- };
-
- std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks;
- for (uint32_t BlockIndex : BlocksToReWrite)
- {
- const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
-
- Ref<BlockStoreFile> OldBlockFile;
- {
- RwLock::SharedLockScope _i(m_LocationMapLock);
- OldBlockFile = m_ChunkBlocks[BlockIndex];
- }
- const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex];
- if (KeepMap.empty())
- {
- const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap);
m_CasLog.Append(LogEntries);
m_CasLog.Flush();
{
- RwLock::ExclusiveLockScope _i(m_LocationMapLock);
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
Stopwatch Timer;
- const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ const auto ____ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
- UpdateLocations(LogEntries);
- m_ChunkBlocks[BlockIndex] = nullptr;
- }
- DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
- ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'",
- m_ContainerBaseName,
- BlockIndex,
- OldBlockFile->GetPath());
- std::error_code Ec;
- OldBlockFile->MarkAsDeleteOnClose(Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
- }
- continue;
- }
-
- std::vector<uint8_t> Chunk;
- for (const IoHash& ChunkHash : KeepMap)
- {
- auto KeyIt = LocationMap.find(ChunkHash);
- const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
- Chunk.resize(ChunkLocation.Size);
- OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
-
- if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
- {
- uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {});
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
-
- if (NewBlockFile)
+ for (const CasDiskIndexEntry& Entry : LogEntries)
{
- NewBlockFile->Truncate(WriteOffset);
- NewBlockFile->Flush();
- }
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- UpdateLocations(LogEntries);
- if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
- m_Config.RootDirectory / m_ContainerBaseName,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
- return;
- }
- while (m_ChunkBlocks.contains(NextBlockIndex))
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
{
- NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- }
- std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
- NewBlockFile = new BlockStoreFile(NewBlockPath);
- m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
- }
-
- MovedCount += MovedBlockChunks.size();
- MovedBlockChunks.clear();
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
- return;
- }
- if (Space.Free < m_MaxBlockSize)
- {
- uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve();
- if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
- {
- ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize,
- NiceBytes(Space.Free + ReclaimedSpace));
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- m_ChunkBlocks.erase(NextBlockIndex);
- return;
+ m_LocationMap.erase(Entry.Key);
+ auto KeyIt = m_LocationMap.find(Entry.Key);
+ uint64_t ChunkSize = Entry.Location.GetSize();
+ m_TotalSize.fetch_sub(ChunkSize);
+ continue;
}
-
- ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- ReclaimedSpace,
- NiceBytes(Space.Free + ReclaimedSpace));
+ m_LocationMap[Entry.Key] = Entry.Location;
}
- NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
}
-
- NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
- MovedBlockChunks.emplace(
- ChunkHash,
- BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment));
- WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
- }
- Chunk.clear();
- if (NewBlockFile)
- {
- NewBlockFile->Truncate(WriteOffset);
- NewBlockFile->Flush();
- NewBlockFile = {};
- }
-
- const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap);
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- UpdateLocations(LogEntries);
- m_ChunkBlocks[BlockIndex] = nullptr;
- }
- MovedCount += MovedBlockChunks.size();
- DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
- MovedBlockChunks.clear();
-
- ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath());
- std::error_code Ec;
- OldBlockFile->MarkAsDeleteOnClose(Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
- }
- OldBlockFile = nullptr;
- }
-
- for (const IoHash& ChunkHash : DeletedChunks)
- {
- DeletedSize += LocationMap[ChunkHash].GetSize();
- }
+ });
GcCtx.DeletedCas(DeletedChunks);
}
@@ -935,7 +615,6 @@ CasContainerStrategy::MakeIndexSnapshot()
std::vector<CasDiskIndexEntry> Entries;
{
- RwLock::SharedLockScope __(m_InsertLock);
RwLock::SharedLockScope ___(m_LocationMapLock);
Entries.resize(m_LocationMap.size());
@@ -1480,67 +1159,18 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- std::unordered_set<uint32_t> KnownBlocks;
+ std::unordered_set<uint32_t> KnownBlocks;
+ std::vector<BlockStoreLocation> KnownLocations;
+ KnownLocations.reserve(m_LocationMap.size());
for (const auto& Entry : m_LocationMap)
{
const BlockStoreDiskLocation& Location = Entry.second;
m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_seq_cst);
KnownBlocks.insert(Location.GetBlockIndex());
+ KnownLocations.push_back(Location.Get(m_PayloadAlignment));
}
- if (std::filesystem::is_directory(m_BlocksBasePath))
- {
- std::vector<std::filesystem::path> FoldersToScan;
- FoldersToScan.push_back(m_BlocksBasePath);
- size_t FolderOffset = 0;
- while (FolderOffset < FoldersToScan.size())
- {
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
- {
- if (Entry.is_directory())
- {
- FoldersToScan.push_back(Entry.path());
- continue;
- }
- if (Entry.is_regular_file())
- {
- const std::filesystem::path Path = Entry.path();
- if (Path.extension() != DataExtension)
- {
- continue;
- }
- std::string FileName = Path.stem().string();
- uint32_t BlockIndex;
- bool OK = ParseHexNumber(FileName, BlockIndex);
- if (!OK)
- {
- continue;
- }
- if (!KnownBlocks.contains(BlockIndex))
- {
- // Log removing unreferenced block
- // Clear out unused blocks
- ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path);
- std::error_code Ec;
- std::filesystem::remove(Path, Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
- }
- continue;
- }
- Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
- BlockFile->Open();
- m_ChunkBlocks[BlockIndex] = BlockFile;
- }
- }
- ++FolderOffset;
- }
- }
- else
- {
- CreateDirectories(m_BlocksBasePath);
- }
+ m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0))
{
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 11da37202..114a6a48c 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -78,17 +78,12 @@ private:
TCasLogFile<CasDiskIndexEntry> m_CasLog;
std::string m_ContainerBaseName;
std::filesystem::path m_BlocksBasePath;
+ BlockStore m_BlockStore;
RwLock m_LocationMapLock;
typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t;
LocationMap_t m_LocationMap;
- std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
- RwLock m_InsertLock; // used to serialize inserts
- Ref<BlockStoreFile> m_WriteBlock;
- std::uint64_t m_CurrentInsertOffset = 0;
-
- std::atomic_uint32_t m_WriteBlockIndex{};
std::atomic_uint64_t m_TotalSize{};
};
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index 424db461a..4dd6e5289 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -15,8 +15,14 @@ struct BlockStoreLocation
uint32_t BlockIndex;
uint64_t Offset;
uint64_t Size;
+
+ inline auto operator<=>(const BlockStoreLocation& Rhs) const = default;
};
+constexpr BlockStoreLocation InvalidBlockStoreLocation{.BlockIndex = 0xfffffffful,
+ .Offset = 0xffffffffffffffffull,
+ .Size = 0xffffffffffffffffull};
+
#pragma pack(push)
#pragma pack(1)
@@ -99,6 +105,41 @@ private:
BasicFile m_File;
};
+class BlockStore
+{
+public:
+ void Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations);
+ BlockStoreLocation WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment);
+ Ref<BlockStoreFile> GetChunkBlock(const BlockStoreLocation& Location);
+ void Flush();
+
+ typedef std::function<void(const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, const std::vector<size_t> RemovedChunks)>
+ ReclaimCallback;
+
+ void ReclaimSpace(
+ const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::vector<size_t>& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& Callback = [](const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) {});
+
+private:
+ std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
+
+ RwLock m_InsertLock; // used to serialize inserts
+ Ref<BlockStoreFile> m_WriteBlock;
+ std::uint64_t m_CurrentInsertOffset = 0;
+ std::atomic_uint32_t m_WriteBlockIndex{};
+
+ uint64_t m_MaxBlockSize = 1u << 28;
+ uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1;
+ std::filesystem::path m_BlocksBasePath;
+ std::atomic_uint64_t m_TotalSize{};
+};
+
void blockstore_forcelink();
} // namespace zen