aboutsummaryrefslogtreecommitdiff
path: root/zenstore
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2022-05-20 12:42:56 +0200
committerStefan Boberg <[email protected]>2022-05-20 12:42:56 +0200
commit5b271be0169b842cdc3d576e48bf0ddc2f122852 (patch)
tree16f501d2190f19a7281ce3f30365817464e146cb /zenstore
parentAdded ZEN_USE_CATCH2 define (diff)
parentfix mac compilation error (diff)
downloadzen-5b271be0169b842cdc3d576e48bf0ddc2f122852.tar.xz
zen-5b271be0169b842cdc3d576e48bf0ddc2f122852.zip
Merge branch 'main' into use-catch2
Diffstat (limited to 'zenstore')
-rw-r--r--zenstore/basicfile.cpp25
-rw-r--r--zenstore/blockstore.cpp1184
-rw-r--r--zenstore/cidstore.cpp23
-rw-r--r--zenstore/compactcas.cpp993
-rw-r--r--zenstore/compactcas.h7
-rw-r--r--zenstore/filecas.cpp4
-rw-r--r--zenstore/gc.cpp23
-rw-r--r--zenstore/include/zenstore/basicfile.h12
-rw-r--r--zenstore/include/zenstore/blockstore.h79
-rw-r--r--zenstore/include/zenstore/gc.h3
10 files changed, 1489 insertions, 864 deletions
diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp
index 8eb172a1c..e5a2adc41 100644
--- a/zenstore/basicfile.cpp
+++ b/zenstore/basicfile.cpp
@@ -373,31 +373,6 @@ BasicFile::SetFileSize(uint64_t FileSize)
#endif
}
-void
-BasicFile::MarkAsDeleteOnClose(std::error_code& Ec)
-{
- Ec.clear();
-#if ZEN_PLATFORM_WINDOWS
- FILE_DISPOSITION_INFO Fdi{};
- Fdi.DeleteFile = TRUE;
- BOOL Success = SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
- if (!Success)
- {
- Ec = MakeErrorCodeFromLastError();
- }
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- std::filesystem::path SourcePath = PathFromHandle(m_FileHandle);
- if (unlink(SourcePath.c_str()) < 0)
- {
- int UnlinkError = zen::GetLastError();
- if (UnlinkError != ENOENT)
- {
- Ec = MakeErrorCode(UnlinkError);
- }
- }
-#endif
-}
-
void*
BasicFile::Detach()
{
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index 1eb859d5a..d490678b5 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -1,13 +1,17 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "compactcas.h"
-
#include <zenstore/blockstore.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
# include <algorithm>
# include <random>
#endif
@@ -67,9 +71,9 @@ BlockStoreFile::FileSize()
}
void
-BlockStoreFile::MarkAsDeleteOnClose(std::error_code& Ec)
+BlockStoreFile::MarkAsDeleteOnClose()
{
- m_File.MarkAsDeleteOnClose(Ec);
+ m_IoBuffer.MarkAsDeleteOnClose();
}
IoBuffer
@@ -102,12 +106,809 @@ BlockStoreFile::Flush()
m_File.Flush();
}
+BasicFile&
+BlockStoreFile::GetBasicFile()
+{
+ return m_File;
+}
+
void
BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
{
m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
}
+constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024;
+
+void
+BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations)
+{
+ ZEN_ASSERT(MaxBlockSize > 0);
+ ZEN_ASSERT(MaxBlockCount > 0);
+ ZEN_ASSERT(IsPow2(MaxBlockCount));
+
+ m_BlocksBasePath = BlocksBasePath;
+ m_MaxBlockSize = MaxBlockSize;
+
+ m_ChunkBlocks.clear();
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : KnownLocations)
+ {
+ KnownBlocks.insert(Entry.BlockIndex);
+ }
+
+ if (std::filesystem::is_directory(m_BlocksBasePath))
+ {
+ std::vector<std::filesystem::path> FoldersToScan;
+ FoldersToScan.push_back(m_BlocksBasePath);
+ size_t FolderOffset = 0;
+ while (FolderOffset < FoldersToScan.size())
+ {
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
+ {
+ if (Entry.is_directory())
+ {
+ FoldersToScan.push_back(Entry.path());
+ continue;
+ }
+ if (Entry.is_regular_file())
+ {
+ const std::filesystem::path Path = Entry.path();
+ if (Path.extension() != GetBlockFileExtension())
+ {
+ continue;
+ }
+ std::string FileName = PathToUtf8(Path.stem());
+ uint32_t BlockIndex;
+ bool OK = ParseHexNumber(FileName, BlockIndex);
+ if (!OK)
+ {
+ continue;
+ }
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ // Log removing unreferenced block
+ // Clear out unused blocks
+ ZEN_INFO("removing unused block at '{}'", Path);
+ std::error_code Ec;
+ std::filesystem::remove(Path, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
+ }
+ continue;
+ }
+ Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
+ BlockFile->Open();
+ m_ChunkBlocks[BlockIndex] = BlockFile;
+ }
+ }
+ ++FolderOffset;
+ }
+ }
+ else
+ {
+ CreateDirectories(m_BlocksBasePath);
+ }
+}
+
+void
+BlockStore::Close()
+{
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ m_WriteBlock = nullptr;
+ m_CurrentInsertOffset = 0;
+ m_WriteBlockIndex = 0;
+ m_ChunkBlocks.clear();
+ m_BlocksBasePath.clear();
+}
+
+void
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback)
+{
+ ZEN_ASSERT(Data != nullptr);
+ ZEN_ASSERT(Size > 0u);
+ ZEN_ASSERT(Size <= m_MaxBlockSize);
+ ZEN_ASSERT(Alignment > 0u);
+
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ bool IsWriting = m_WriteBlock != nullptr;
+ if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize)
+ {
+ if (m_WriteBlock)
+ {
+ m_WriteBlock = nullptr;
+ }
+ {
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
+ }
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ m_WriteBlock = new BlockStoreFile(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ }
+ m_CurrentInsertOffset = 0;
+ m_WriteBlock->Create(m_MaxBlockSize);
+ }
+ uint64_t InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ m_ActiveWriteBlocks.push_back(WriteBlockIndex);
+ InsertLock.ReleaseNow();
+
+ WriteBlock->Write(Data, Size, InsertOffset);
+
+ Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size});
+
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ }
+}
+
+BlockStore::ReclaimSnapshotState
+BlockStore::GetReclaimSnapshotState()
+{
+ ReclaimSnapshotState State;
+ RwLock::SharedLockScope _(m_InsertLock);
+ for (uint32_t BlockIndex : m_ActiveWriteBlocks)
+ {
+ State.m_ActiveWriteBlocks.insert(BlockIndex);
+ }
+ State.BlockCount = m_ChunkBlocks.size();
+ return State;
+}
+
+IoBuffer
+BlockStore::TryGetChunk(const BlockStoreLocation& Location)
+{
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
+ {
+ if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ {
+ return Block->GetChunk(Location.Offset, Location.Size);
+ }
+ }
+ return IoBuffer();
+}
+
+void
+BlockStore::Flush()
+{
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
+ {
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+}
+
+void
+BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
+ const std::vector<BlockStoreLocation>& ChunkLocations,
+ const ChunkIndexArray& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& ChangeCallback,
+ const ClaimDiskReserveCallback& DiskReserveCallback)
+{
+ if (ChunkLocations.empty())
+ {
+ return;
+ }
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = ChunkLocations.size();
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = 0;
+ uint64_t NewTotalSize = 0;
+
+ uint64_t MovedCount = 0;
+ uint64_t DeletedCount = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO(
+ "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "chunks ({}).",
+ m_BlocksBasePath,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ });
+
+ size_t BlockCount = Snapshot.BlockCount;
+
+ std::unordered_set<size_t> KeepChunkMap;
+ KeepChunkMap.reserve(KeepChunkIndexes.size());
+ for (size_t KeepChunkIndex : KeepChunkIndexes)
+ {
+ KeepChunkMap.insert(KeepChunkIndex);
+ }
+
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<ChunkIndexArray> BlockKeepChunks;
+ std::vector<ChunkIndexArray> BlockDeleteChunks;
+
+ BlockIndexToChunkMapIndex.reserve(BlockCount);
+ BlockKeepChunks.reserve(BlockCount);
+ BlockDeleteChunks.reserve(BlockCount);
+ size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
+
+ size_t DeleteCount = 0;
+ for (size_t Index = 0; Index < TotalChunkCount; ++Index)
+ {
+ const BlockStoreLocation& Location = ChunkLocations[Index];
+ OldTotalSize += Location.Size;
+ if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex))
+ {
+ continue;
+ }
+
+ auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
+ size_t ChunkMapIndex = 0;
+ if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
+ {
+ ChunkMapIndex = BlockKeepChunks.size();
+ BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
+ BlockKeepChunks.resize(ChunkMapIndex + 1);
+ BlockKeepChunks.back().reserve(GuesstimateCountPerBlock);
+ BlockDeleteChunks.resize(ChunkMapIndex + 1);
+ BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock);
+ }
+ else
+ {
+ ChunkMapIndex = BlockIndexPtr->second;
+ }
+
+ if (KeepChunkMap.contains(Index))
+ {
+ ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ NewTotalSize += Location.Size;
+ continue;
+ }
+ ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ DeleteCount++;
+ }
+
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
+ for (const auto& Entry : BlockIndexToChunkMapIndex)
+ {
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex];
+ if (ChunkMap.empty())
+ {
+ continue;
+ }
+ BlocksToReWrite.insert(BlockIndex);
+ }
+
+ if (DryRun)
+ {
+ ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_BlocksBasePath,
+ DeleteCount,
+ NiceBytes(OldTotalSize - NewTotalSize),
+ TotalChunkCount,
+ OldTotalSize);
+ return;
+ }
+
+ Ref<BlockStoreFile> NewBlockFile;
+ try
+ {
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
+ for (uint32_t BlockIndex : BlocksToReWrite)
+ {
+ const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
+
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _i(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ OldBlockFile = m_ChunkBlocks[BlockIndex];
+ ZEN_ASSERT(OldBlockFile);
+ }
+
+ const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex];
+ if (KeepMap.empty())
+ {
+ const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ }
+ ChangeCallback({}, DeleteMap);
+ DeletedCount += DeleteMap.size();
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ OldBlockFile->MarkAsDeleteOnClose();
+ }
+ continue;
+ }
+
+ MovedChunksArray MovedChunks;
+ std::vector<uint8_t> Chunk;
+ for (const size_t& ChunkIndex : KeepMap)
+ {
+ const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
+ {
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = nullptr;
+ }
+ {
+ ChangeCallback(MovedChunks, {});
+ MovedCount += KeepMap.size();
+ MovedChunks.clear();
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BlocksBasePath,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+ while (m_ChunkBlocks.contains(NextBlockIndex))
+ {
+ NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
+ return;
+ }
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = DiskReserveCallback();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks.erase(NextBlockIndex);
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BlocksBasePath,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ }
+
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}});
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ }
+ Chunk.clear();
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = nullptr;
+ }
+
+ const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ }
+
+ ChangeCallback(MovedChunks, DeleteMap);
+ MovedCount += KeepMap.size();
+ DeletedCount += DeleteMap.size();
+ MovedChunks.clear();
+ {
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ OldBlockFile->MarkAsDeleteOnClose();
+ }
+ }
+ }
+ catch (std::exception& ex)
+ {
+ ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what());
+ if (NewBlockFile)
+ {
+ ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
+ NewBlockFile->MarkAsDeleteOnClose();
+ }
+ }
+}
+
+void
+BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
+ IterateChunksSmallSizeCallback SmallSizeCallback,
+ IterateChunksLargeSizeCallback LargeSizeCallback)
+{
+ // We do a read sweep through the payloads file and validate
+ // any entries that are contained within each segment, with
+ // the assumption that most entries will be checked in this
+ // pass. An alternative strategy would be to use memory mapping.
+
+ {
+ ChunkIndexArray BigChunks;
+ IoBuffer ReadBuffer{ScrubSmallChunkWindowSize};
+ void* BufferBase = ReadBuffer.MutableData();
+
+ RwLock::SharedLockScope _(m_InsertLock);
+
+ for (const auto& Block : m_ChunkBlocks)
+ {
+ uint64_t WindowStart = 0;
+ uint64_t WindowEnd = ScrubSmallChunkWindowSize;
+ uint32_t BlockIndex = Block.first;
+ const Ref<BlockStoreFile>& BlockFile = Block.second;
+ const uint64_t FileSize = BlockFile->FileSize();
+
+ do
+ {
+ const uint64_t ChunkSize = Min(ScrubSmallChunkWindowSize, FileSize - WindowStart);
+ BlockFile->Read(BufferBase, ChunkSize, WindowStart);
+
+ // TODO: We could be smarter here if the ChunkLocations were sorted on block index - we could
+ // then only scan a subset of ChunkLocations instead of scanning through them all...
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
+ {
+ const BlockStoreLocation Location = ChunkLocations[ChunkIndex];
+ if (BlockIndex != Location.BlockIndex)
+ {
+ continue;
+ }
+
+ const uint64_t EntryOffset = Location.Offset;
+ if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
+ {
+ const uint64_t EntryEnd = EntryOffset + Location.Size;
+
+ if (EntryEnd >= WindowEnd)
+ {
+ BigChunks.push_back(ChunkIndex);
+
+ continue;
+ }
+
+ SmallSizeCallback(ChunkIndex,
+ reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart,
+ Location.Size);
+ }
+ }
+
+ WindowStart += ScrubSmallChunkWindowSize;
+ WindowEnd += ScrubSmallChunkWindowSize;
+ } while (WindowStart < FileSize);
+ }
+
+ // Deal with large chunks and chunks that extend over a ScrubSmallChunkWindowSize border
+ for (size_t ChunkIndex : BigChunks)
+ {
+ const BlockStoreLocation Location = ChunkLocations[ChunkIndex];
+ const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex];
+ LargeSizeCallback(ChunkIndex, BlockFile, Location.Offset, Location.Size);
+ }
+ }
+}
+
+bool
+BlockStore::Split(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::filesystem::path& SourceBlockFilePath,
+ const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ size_t PayloadAlignment,
+ bool CleanSource,
+ const SplitCallback& Callback)
+{
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message());
+ return false;
+ }
+
+ if (Space.Free < MaxBlockSize)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
+ BlocksBasePath,
+ MaxBlockSize,
+ NiceBytes(Space.Free));
+ return false;
+ }
+
+ size_t TotalSize = 0;
+ for (const BlockStoreLocation& Location : ChunkLocations)
+ {
+ TotalSize += Location.Size;
+ }
+ size_t ChunkCount = ChunkLocations.size();
+ uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount);
+ uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize;
+ if (MaxRequiredBlockCount > MaxBlockCount)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
+ BlocksBasePath,
+ MaxRequiredBlockCount,
+ MaxBlockCount);
+ return false;
+ }
+
+ constexpr const uint64_t DiskReserve = 1ul << 28;
+
+ if (CleanSource)
+ {
+ if (Space.Free < (MaxBlockSize + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ BlocksBasePath,
+ NiceBytes(MaxBlockSize + DiskReserve),
+ NiceBytes(Space.Free));
+ return false;
+ }
+ }
+ else
+ {
+ if (Space.Free < (RequiredDiskSpace + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ BlocksBasePath,
+ NiceBytes(RequiredDiskSpace + DiskReserve),
+ NiceBytes(Space.Free));
+ return false;
+ }
+ }
+
+ uint32_t WriteBlockIndex = 0;
+ while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+
+ BasicFile BlockFile;
+ BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+
+ if (CleanSource && (MaxRequiredBlockCount < 2))
+ {
+ MovedChunksArray Chunks;
+ Chunks.reserve(ChunkCount);
+ for (size_t Index = 0; Index < ChunkCount; ++Index)
+ {
+ const BlockStoreLocation& ChunkLocation = ChunkLocations[Index];
+ Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}});
+ }
+ std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex);
+ CreateDirectories(BlockPath.parent_path());
+ BlockFile.Close();
+ std::filesystem::rename(SourceBlockFilePath, BlockPath);
+ Callback(Chunks);
+ return true;
+ }
+
+ ChunkIndexArray ChunkIndexes;
+ ChunkIndexes.reserve(ChunkCount);
+ for (size_t Index = 0; Index < ChunkCount; ++Index)
+ {
+ ChunkIndexes.push_back(Index);
+ }
+
+ std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&](size_t Lhs, size_t Rhs) {
+ const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs];
+ const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs];
+ return LhsLocation.Offset < RhsLocation.Offset;
+ });
+
+ uint64_t BlockSize = 0;
+ uint64_t BlockOffset = 0;
+ std::vector<BlockStoreLocation> NewLocations;
+ struct BlockData
+ {
+ MovedChunksArray Chunks;
+ uint64_t BlockOffset;
+ uint64_t BlockSize;
+ uint32_t BlockIndex;
+ };
+
+ std::vector<BlockData> BlockRanges;
+ MovedChunksArray Chunks;
+ BlockRanges.reserve(MaxRequiredBlockCount);
+ for (const size_t& ChunkIndex : ChunkIndexes)
+ {
+ const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex];
+
+ uint64_t ChunkOffset = LegacyChunkLocation.Offset;
+ uint64_t ChunkSize = LegacyChunkLocation.Size;
+ uint64_t ChunkEnd = ChunkOffset + ChunkSize;
+
+ if (BlockSize == 0)
+ {
+ BlockOffset = ChunkOffset;
+ }
+ if ((ChunkEnd - BlockOffset) > MaxBlockSize)
+ {
+ BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
+ BlockRange.Chunks.swap(Chunks);
+ BlockRanges.push_back(BlockRange);
+
+ WriteBlockIndex++;
+ while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+ BlockOffset = ChunkOffset;
+ BlockSize = 0;
+ }
+ BlockSize = RoundUp(BlockSize, PayloadAlignment);
+ BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
+ Chunks.push_back({ChunkIndex, ChunkLocation});
+ BlockSize = ChunkEnd - BlockOffset;
+ }
+ if (BlockSize > 0)
+ {
+ BlockRanges.push_back(
+ {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
+ }
+
+ Stopwatch WriteBlockTimer;
+
+ std::reverse(BlockRanges.begin(), BlockRanges.end());
+ std::vector<std::uint8_t> Buffer(1 << 28);
+ for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
+ {
+ const BlockData& BlockRange = BlockRanges[Idx];
+ if (Idx > 0)
+ {
+ uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
+ uint64_t Completed = BlockOffset + BlockSize - Remaining;
+ uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
+
+ ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
+ BlocksBasePath,
+ Idx,
+ BlockRanges.size(),
+ NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
+ NiceBytes(BlockOffset + BlockSize),
+ NiceTimeSpanMs(ETA));
+ }
+
+ std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex);
+ BlockStoreFile ChunkBlock(BlockPath);
+ ChunkBlock.Create(BlockRange.BlockSize);
+ uint64_t Offset = 0;
+ while (Offset < BlockRange.BlockSize)
+ {
+ uint64_t Size = BlockRange.BlockSize - Offset;
+ if (Size > Buffer.size())
+ {
+ Size = Buffer.size();
+ }
+ BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
+ ChunkBlock.Write(Buffer.data(), Size, Offset);
+ Offset += Size;
+ }
+ ChunkBlock.Truncate(Offset);
+ ChunkBlock.Flush();
+
+ Callback(BlockRange.Chunks);
+
+ if (CleanSource)
+ {
+ BlockFile.SetFileSize(BlockRange.BlockOffset);
+ }
+ }
+ BlockFile.Close();
+
+ return true;
+}
+
+const char*
+BlockStore::GetBlockFileExtension()
+{
+ return ".ucas";
+}
+
+std::filesystem::path
+BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
+{
+ ExtendablePathBuilder<256> Path;
+
+ char BlockHexString[9];
+ ToHexNumber(BlockIndex, BlockHexString);
+
+ Path.Append(BlocksBasePath);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
+ Path.AppendSeparator();
+ Path.Append(BlockHexString);
+ Path.Append(GetBlockFileExtension());
+ return Path.ToPath();
+}
+
#if ZEN_WITH_TESTS
static bool
@@ -217,9 +1018,7 @@ TEST_CASE("blockstore.blockfile")
{
BlockStoreFile File1(RootDirectory / "1");
File1.Open();
- std::error_code Ec;
- File1.MarkAsDeleteOnClose(Ec);
- CHECK(!Ec);
+ File1.MarkAsDeleteOnClose();
DataChunk = File1.GetChunk(0, 5);
BoopChunk = File1.GetChunk(5, 5);
}
@@ -232,6 +1031,377 @@ TEST_CASE("blockstore.blockfile")
CHECK(!std::filesystem::exists(RootDirectory / "1"));
}
+namespace {
+ BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment)
+ {
+ BlockStoreLocation Location;
+ Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; });
+ CHECK(Location.Size == String.length());
+ return Location;
+ };
+
+ std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location)
+ {
+ IoBuffer ChunkData = Store.TryGetChunk(Location);
+ if (!ChunkData)
+ {
+ return "";
+ }
+ std::string AsString((const char*)ChunkData.Data(), ChunkData.Size());
+ return AsString;
+ };
+
+ std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories)
+ {
+ DirectoryContent DirectoryContent;
+ GetDirectoryContent(RootDir,
+ DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) |
+ (Directories ? DirectoryContent::IncludeDirsFlag : 0),
+ DirectoryContent);
+ std::vector<std::filesystem::path> Result;
+ Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end());
+ Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end());
+ return Result;
+ };
+
+ static IoBuffer CreateChunk(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
+ }
+} // namespace
+
+TEST_CASE("blockstore.chunks")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory, 128, 1024, {});
+ IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
+ CHECK(!BadChunk);
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+
+ CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
+ CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
+
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4);
+ CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex);
+
+ CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
+ CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
+ CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData);
+}
+
+TEST_CASE("blockstore.clean.stray.blocks")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 128, 1024, {});
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ WriteStringAsChunk(Store, ThirdChunkData, 4);
+
+ Store.Close();
+
+ // Not referencing the second block means that we should be deleted
+ Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation});
+
+ CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1);
+}
+
+TEST_CASE("blockstore.flush.forces.new.block")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 128, 1024, {});
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ WriteStringAsChunk(Store, FirstChunkData, 4);
+ Store.Flush();
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ WriteStringAsChunk(Store, SecondChunkData, 4);
+ Store.Flush();
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ WriteStringAsChunk(Store, ThirdChunkData, 4);
+
+ CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3);
+}
+
+TEST_CASE("blockstore.iterate.chunks")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {});
+ IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
+ CHECK(!BadChunk);
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+ Store.Flush();
+
+ std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L');
+ BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);
+
+ Store.IterateChunks(
+ {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation},
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ CHECK(Data);
+ CHECK(Size > 0);
+ std::string AsString((const char*)Data, Size);
+ switch (ChunkIndex)
+ {
+ case 0:
+ CHECK(AsString == FirstChunkData);
+ break;
+ case 1:
+ CHECK(AsString == SecondChunkData);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ },
+ [&](size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size) {
+ CHECK(BlockFile);
+ CHECK(ChunkIndex == 2);
+ CHECK(Offset == VeryLargeChunkLocation.Offset);
+ CHECK(Size == VeryLargeChunkLocation.Size);
+ size_t StreamOffset = 0;
+ BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) {
+ const char* VeryLargeChunkSection = &(VeryLargeChunk.data()[StreamOffset]);
+ CHECK(memcmp(VeryLargeChunkSection, Data, Size) == 0);
+ });
+ });
+}
+
+TEST_CASE("blockstore.reclaim.space")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 512, 1024, {});
+
+ constexpr size_t ChunkCount = 200;
+ constexpr size_t Alignment = 8;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkHashes;
+ ChunkLocations.reserve(ChunkCount);
+ ChunkHashes.reserve(ChunkCount);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer Chunk = CreateChunk(57 + ChunkIndex);
+
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); });
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ std::vector<size_t> ChunksToKeep;
+ ChunksToKeep.reserve(ChunkLocations.size());
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ ChunksToKeep.push_back(ChunkIndex);
+ }
+
+ Store.Flush();
+ BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState();
+ Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true);
+
+ // If we keep all the chunks we should not get any callbacks on moved/deleted stuff
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ ChunksToKeep,
+ Alignment,
+ false,
+ [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+
+ size_t DeleteChunkCount = 38;
+ ChunksToKeep.clear();
+ for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ ChunksToKeep.push_back(ChunkIndex);
+ }
+
+ std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations;
+ size_t MovedChunkCount = 0;
+ size_t DeletedChunkCount = 0;
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ ChunksToKeep,
+ Alignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
+ for (const auto& MovedChunk : MovedChunks)
+ {
+ CHECK(MovedChunk.first >= DeleteChunkCount);
+ NewChunkLocations[MovedChunk.first] = MovedChunk.second;
+ }
+ MovedChunkCount += MovedChunks.size();
+ for (size_t DeletedIndex : DeletedChunks)
+ {
+ CHECK(DeletedIndex < DeleteChunkCount);
+ }
+ DeletedChunkCount += DeletedChunks.size();
+ },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+ CHECK(MovedChunkCount <= DeleteChunkCount);
+ CHECK(DeletedChunkCount == DeleteChunkCount);
+ ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end());
+
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
+ if (ChunkIndex >= DeleteChunkCount)
+ {
+ IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ }
+ }
+
+ NewChunkLocations = ChunkLocations;
+ MovedChunkCount = 0;
+ DeletedChunkCount = 0;
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ {},
+ Alignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
+ CHECK(MovedChunks.empty());
+ DeletedChunkCount += DeletedChunks.size();
+ },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+ CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount);
+}
+
+TEST_CASE("blockstore.thread.read.write")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 1088, 1024, {});
+
+ constexpr size_t ChunkCount = 1000;
+ constexpr size_t Alignment = 8;
+ std::vector<IoBuffer> Chunks;
+ std::vector<IoHash> ChunkHashes;
+ Chunks.reserve(ChunkCount);
+ ChunkHashes.reserve(ChunkCount);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2);
+ Chunks.push_back(Chunk);
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkLocations.resize(ChunkCount);
+
+ WorkerThreadPool WorkerPool(8);
+ std::atomic<size_t> WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ std::vector<BlockStoreLocation> SecondChunkLocations;
+ SecondChunkLocations.resize(ChunkCount);
+ WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ SecondChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ });
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size() * 2)
+ {
+ Sleep(1);
+ }
+}
+
#endif
void
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 509d21abe..55bec817f 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -234,23 +234,22 @@ struct CidStore::Impl
void RemoveCids(CasChunkSet& CasChunks)
{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;)
+ std::vector<IndexEntry> RemovedEntries;
+ RemovedEntries.reserve(CasChunks.GetSize());
{
- if (CasChunks.ContainsChunk(It->second))
- {
- const IoHash& BadHash = It->first;
-
- // Log a tombstone record
- LogMapping(BadHash, IoHash::Zero);
- It = m_CidMap.erase(It);
- }
- else
+ RwLock::ExclusiveLockScope _(m_Lock);
+ for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;)
{
+ if (CasChunks.ContainsChunk(It->second))
+ {
+ RemovedEntries.push_back({It->first, IoHash::Zero});
+ It = m_CidMap.erase(It);
+ continue;
+ }
++It;
}
}
+ m_LogFile.Append(RemovedEntries);
}
uint64_t m_LastScrubTime = 0;
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 032c9bcb3..c277359bd 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -48,25 +48,8 @@ struct CasDiskIndexHeader
static_assert(sizeof(CasDiskIndexHeader) == 32);
namespace {
- std::vector<CasDiskIndexEntry> MakeCasDiskEntries(const std::unordered_map<IoHash, BlockStoreDiskLocation>& MovedChunks,
- const std::vector<IoHash>& DeletedChunks)
- {
- std::vector<CasDiskIndexEntry> result;
- result.reserve(MovedChunks.size());
- for (const auto& MovedEntry : MovedChunks)
- {
- result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second});
- }
- for (const IoHash& ChunkHash : DeletedChunks)
- {
- result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone});
- }
- return result;
- }
-
const char* IndexExtension = ".uidx";
const char* LogExtension = ".ulog";
- const char* DataExtension = ".ucas";
std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
{
@@ -93,22 +76,6 @@ namespace {
return GetBasePath(RootPath, ContainerBaseName) / "blocks";
}
- std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
- {
- ExtendablePathBuilder<256> Path;
-
- char BlockHexString[9];
- ToHexNumber(BlockIndex, BlockHexString);
-
- Path.Append(BlocksBasePath);
- Path.AppendSeparator();
- Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
- Path.AppendSeparator();
- Path.Append(BlockHexString);
- Path.Append(DataExtension);
- return Path.ToPath();
- }
-
std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
{
return RootPath / (ContainerBaseName + LogExtension);
@@ -116,7 +83,7 @@ namespace {
std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
{
- return RootPath / (ContainerBaseName + DataExtension);
+ return RootPath / (ContainerBaseName + ".ucas");
}
std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
@@ -263,53 +230,12 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3
CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
- uint32_t WriteBlockIndex;
- Ref<BlockStoreFile> WriteBlock;
- uint64_t InsertOffset;
{
- RwLock::ExclusiveLockScope _(m_InsertLock);
-
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ if (m_LocationMap.contains(ChunkHash))
{
- RwLock::SharedLockScope __(m_LocationMapLock);
- if (m_LocationMap.contains(ChunkHash))
- {
- return CasStore::InsertResult{.New = false};
- }
+ return CasStore::InsertResult{.New = false};
}
-
- // New entry
-
- WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- bool IsWriting = m_WriteBlock != nullptr;
- if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
- {
- if (m_WriteBlock)
- {
- m_WriteBlock = nullptr;
- }
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
- {
- throw std::runtime_error(
- fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName));
- }
- WriteBlockIndex += IsWriting ? 1 : 0;
- while (m_ChunkBlocks.contains(WriteBlockIndex))
- {
- WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- }
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- m_WriteBlock = new BlockStoreFile(BlockPath);
- m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
- }
- m_CurrentInsertOffset = 0;
- m_WriteBlock->Create(m_MaxBlockSize);
- }
- InsertOffset = m_CurrentInsertOffset;
- m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment);
- WriteBlock = m_WriteBlock;
}
// We can end up in a situation that InsertChunk writes the same chunk data in
@@ -324,17 +250,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// This should be a rare occasion and the current flow reduces the time we block for
// reads, insert and GC.
- BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
-
- WriteBlock->Write(ChunkData, ChunkSize, InsertOffset);
- m_CasLog.Append(IndexEntry);
-
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, Location);
- }
+ m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) {
+ BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
+ m_CasLog.Append(IndexEntry);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, DiskLocation);
+ }
+ });
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
return CasStore::InsertResult{.New = true};
}
@@ -348,21 +273,16 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
- Ref<BlockStoreFile> ChunkBlock;
- BlockStoreLocation Location;
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
{
- RwLock::SharedLockScope _(m_LocationMapLock);
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
- {
- Location = KeyIt->second.Get(m_PayloadAlignment);
- ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
- }
- else
- {
- return IoBuffer();
- }
+ return IoBuffer();
}
- return ChunkBlock->GetChunk(Location.Offset, Location.Size);
+ const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment);
+
+ IoBuffer Chunk = m_BlockStore.TryGetChunk(Location);
+ return Chunk;
}
bool
@@ -388,128 +308,94 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
CasContainerStrategy::Flush()
{
- {
- RwLock::ExclusiveLockScope _(m_InsertLock);
- if (m_CurrentInsertOffset > 0)
- {
- uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- m_WriteBlock = nullptr;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
- m_CurrentInsertOffset = 0;
- }
- }
+ m_BlockStore.Flush();
MakeIndexSnapshot();
}
void
CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
- std::vector<CasDiskIndexEntry> BadChunks;
-
- // We do a read sweep through the payloads file and validate
- // any entries that are contained within each segment, with
- // the assumption that most entries will be checked in this
- // pass. An alternative strategy would be to use memory mapping.
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ uint64_t TotalChunkCount = m_LocationMap.size();
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
{
- std::vector<CasDiskIndexEntry> BigChunks;
- const uint64_t WindowSize = 4 * 1024 * 1024;
- IoBuffer ReadBuffer{WindowSize};
- void* BufferBase = ReadBuffer.MutableData();
-
- RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time?
- RwLock::SharedLockScope __(m_LocationMapLock);
-
- for (const auto& Block : m_ChunkBlocks)
+ for (const auto& Entry : m_LocationMap)
{
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
- const Ref<BlockStoreFile>& BlockFile = Block.second;
- BlockFile->Open();
- const uint64_t FileSize = BlockFile->FileSize();
-
- do
- {
- const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
- BlockFile->Read(BufferBase, ChunkSize, WindowStart);
-
- for (auto& Entry : m_LocationMap)
- {
- const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment);
- const uint64_t EntryOffset = Location.Offset;
+ const IoHash& ChunkHash = Entry.first;
+ const BlockStoreDiskLocation& DiskLocation = Entry.second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
- if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
- {
- const uint64_t EntryEnd = EntryOffset + Location.Size;
-
- if (EntryEnd >= WindowEnd)
- {
- BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
-
- continue;
- }
-
- const IoHash ComputedHash =
- IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, Location.Size);
-
- if (Entry.first != ComputedHash)
- {
- // Hash mismatch
- BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone});
- }
- }
- }
-
- WindowStart += WindowSize;
- WindowEnd += WindowSize;
- } while (WindowStart < FileSize);
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
}
+ }
- // Deal with large chunks
-
- for (const CasDiskIndexEntry& Entry : BigChunks)
- {
- IoHashStream Hasher;
- const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment);
- const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex];
- BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
+ std::vector<IoHash> BadKeys;
- if (Entry.Key != ComputedHash)
+ m_BlockStore.IterateChunks(
+ ChunkLocations,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ const IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
+ const IoHash& ExpectedHash = ChunkIndexToChunkHash[ChunkIndex];
+ if (ComputedHash != ExpectedHash)
{
- BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone});
+ // Hash mismatch
+ BadKeys.push_back(ExpectedHash);
}
- }
- }
+ },
+ [&](size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size) {
+ IoHashStream Hasher;
+ BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+ const IoHash& ExpectedHash = ChunkIndexToChunkHash[ChunkIndex];
+ if (ComputedHash != ExpectedHash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(ExpectedHash);
+ }
+ });
- if (BadChunks.empty())
+ if (BadKeys.empty())
{
return;
}
- ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_Config.RootDirectory / m_ContainerBaseName);
+ ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_Config.RootDirectory / m_ContainerBaseName);
- // Deal with bad chunks by removing them from our lookup map
+ _.ReleaseNow();
- std::vector<IoHash> BadChunkHashes;
- BadChunkHashes.reserve(BadChunks.size());
-
- m_CasLog.Append(BadChunks);
+ if (Ctx.RunRecovery())
{
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- for (const CasDiskIndexEntry& Entry : BadChunks)
+ // Deal with bad chunks by removing them from our lookup map
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
{
- BadChunkHashes.push_back(Entry.Key);
- m_LocationMap.erase(Entry.Key);
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ for (const IoHash& ChunkHash : BadKeys)
+ {
+ const auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
+ {
+ // Might have been GC'd
+ continue;
+ }
+ LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
+ m_LocationMap.erase(KeyIt);
+ }
}
+ m_CasLog.Append(LogEntries);
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
-
- Ctx.ReportBadCasChunks(BadChunkHashes);
+ Ctx.ReportBadCasChunks(BadKeys);
}
void
@@ -533,93 +419,33 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// We update the index as we complete each new block file. This makes it possible
// to break the GC if we want to limit time for execution.
//
- // GC can fairly parallell to regular operation - it will block while taking
- // a snapshot of the current m_LocationMap state.
- //
- // While moving blocks it will do a blocking operation and update the m_LocationMap
- // after each new block is written and figuring out the path to the next new block.
+ // GC can very parallell to regular operation - it will block while taking
+ // a snapshot of the current m_LocationMap state and while moving blocks it will
+ // do a blocking operation and update the m_LocationMap after each new block is
+ // written and figuring out the path to the next new block.
ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+
uint64_t WriteBlockTimeUs = 0;
uint64_t WriteBlockLongestTimeUs = 0;
uint64_t ReadBlockTimeUs = 0;
uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
- std::vector<IoHash> DeletedChunks;
- uint64_t MovedCount = 0;
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([this,
- &TotalTimer,
- &WriteBlockTimeUs,
- &WriteBlockLongestTimeUs,
- &ReadBlockTimeUs,
- &ReadBlockLongestTimeUs,
- &TotalChunkCount,
- &DeletedChunks,
- &MovedCount,
- &DeletedSize,
- OldTotalSize] {
- ZEN_INFO(
- "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
- "#{} "
- "of #{} "
- "chunks ({}).",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedChunks.size(),
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize));
- });
-
- LocationMap_t LocationMap;
- size_t BlockCount;
- uint64_t ExcludeBlockIndex = 0x800000000ull;
+ LocationMap_t LocationMap;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
{
- RwLock::SharedLockScope __(m_InsertLock);
RwLock::SharedLockScope ___(m_LocationMapLock);
- {
- Stopwatch Timer;
- const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_WriteBlock)
- {
- ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- }
- __.ReleaseNow();
- }
- LocationMap = m_LocationMap;
- BlockCount = m_ChunkBlocks.size();
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ LocationMap = m_LocationMap;
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
}
- if (LocationMap.empty())
- {
- ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName);
- return;
- }
-
- TotalChunkCount = LocationMap.size();
-
- std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
- std::vector<std::vector<IoHash>> KeepChunks;
- std::vector<std::vector<IoHash>> DeleteChunks;
-
- BlockIndexToChunkMapIndex.reserve(BlockCount);
- KeepChunks.reserve(BlockCount);
- DeleteChunks.reserve(BlockCount);
- size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
+ uint64_t TotalChunkCount = LocationMap.size();
std::vector<IoHash> TotalChunkHashes;
TotalChunkHashes.reserve(TotalChunkCount);
@@ -628,272 +454,82 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
TotalChunkHashes.push_back(Entry.first);
}
- uint64_t DeleteCount = 0;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ BlockStore::ChunkIndexArray KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
- uint64_t NewTotalSize = 0;
GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- auto KeyIt = LocationMap.find(ChunkHash);
- const BlockStoreDiskLocation& Location = KeyIt->second;
- uint32_t BlockIndex = Location.GetBlockIndex();
-
- if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex)
- {
- return;
- }
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreDiskLocation& DiskLocation = KeyIt->second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
- auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex);
- size_t ChunkMapIndex = 0;
- if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
- {
- ChunkMapIndex = KeepChunks.size();
- BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex;
- KeepChunks.resize(ChunkMapIndex + 1);
- KeepChunks.back().reserve(GuesstimateCountPerBlock);
- DeleteChunks.resize(ChunkMapIndex + 1);
- DeleteChunks.back().reserve(GuesstimateCountPerBlock);
- }
- else
- {
- ChunkMapIndex = BlockIndexPtr->second;
- }
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
if (Keep)
{
- std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex];
- ChunkMap.push_back(ChunkHash);
- NewTotalSize += Location.GetSize();
- }
- else
- {
- std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
- ChunkMap.push_back(ChunkHash);
- DeleteCount++;
+ KeepChunkIndexes.push_back(ChunkIndex);
}
});
- std::unordered_set<uint32_t> BlocksToReWrite;
- BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
- for (const auto& Entry : BlockIndexToChunkMapIndex)
- {
- uint32_t BlockIndex = Entry.first;
- size_t ChunkMapIndex = Entry.second;
- const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
- if (ChunkMap.empty())
- {
- continue;
- }
- BlocksToReWrite.insert(BlockIndex);
- }
-
const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
if (!PerformDelete)
{
- uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
- ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- DeleteCount,
- NiceBytes(TotalSize - NewTotalSize),
- TotalChunkCount,
- NiceBytes(TotalSize));
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
return;
}
- // Move all chunks in blocks that have chunks removed to new blocks
-
- Ref<BlockStoreFile> NewBlockFile;
- uint64_t WriteOffset = 0;
- uint32_t NewBlockIndex = 0;
- DeletedChunks.reserve(DeleteCount);
-
- auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) {
- for (const CasDiskIndexEntry& Entry : Entries)
- {
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ std::vector<IoHash> DeletedChunks;
+ m_BlockStore.ReclaimSpace(
+ BlockStoreState,
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_PayloadAlignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- auto KeyIt = m_LocationMap.find(Entry.Key);
- uint64_t ChunkSize = KeyIt->second.GetSize();
- m_TotalSize.fetch_sub(ChunkSize);
- m_LocationMap.erase(KeyIt);
- continue;
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash];
+ LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone});
+ DeletedChunks.push_back(ChunkHash);
}
- m_LocationMap[Entry.Key] = Entry.Location;
- }
- };
-
- std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks;
- for (uint32_t BlockIndex : BlocksToReWrite)
- {
- const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
-
- Ref<BlockStoreFile> OldBlockFile;
- {
- RwLock::SharedLockScope _i(m_LocationMapLock);
- OldBlockFile = m_ChunkBlocks[BlockIndex];
- }
- const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex];
- if (KeepMap.empty())
- {
- const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap);
m_CasLog.Append(LogEntries);
m_CasLog.Flush();
{
- RwLock::ExclusiveLockScope _i(m_LocationMapLock);
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
Stopwatch Timer;
- const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ const auto ____ = MakeGuard([&] {
uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
- UpdateLocations(LogEntries);
- m_ChunkBlocks[BlockIndex] = nullptr;
- }
- DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
- ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'",
- m_ContainerBaseName,
- BlockIndex,
- OldBlockFile->GetPath());
- std::error_code Ec;
- OldBlockFile->MarkAsDeleteOnClose(Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
- }
- continue;
- }
-
- std::vector<uint8_t> Chunk;
- for (const IoHash& ChunkHash : KeepMap)
- {
- auto KeyIt = LocationMap.find(ChunkHash);
- const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
- Chunk.resize(ChunkLocation.Size);
- OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
-
- if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
- {
- uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {});
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
-
- if (NewBlockFile)
- {
- NewBlockFile->Truncate(WriteOffset);
- NewBlockFile->Flush();
- }
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- UpdateLocations(LogEntries);
- if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
- m_Config.RootDirectory / m_ContainerBaseName,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
- return;
- }
- while (m_ChunkBlocks.contains(NextBlockIndex))
- {
- NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- }
- std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
- NewBlockFile = new BlockStoreFile(NewBlockPath);
- m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
- }
-
- MovedCount += MovedBlockChunks.size();
- MovedBlockChunks.clear();
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
+ for (const CasDiskIndexEntry& Entry : LogEntries)
{
- ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
- return;
- }
- if (Space.Free < m_MaxBlockSize)
- {
- uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve();
- if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
{
- ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize,
- NiceBytes(Space.Free + ReclaimedSpace));
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- m_ChunkBlocks.erase(NextBlockIndex);
- return;
+ m_LocationMap.erase(Entry.Key);
+ uint64_t ChunkSize = Entry.Location.GetSize();
+ m_TotalSize.fetch_sub(ChunkSize);
+ continue;
}
-
- ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- ReclaimedSpace,
- NiceBytes(Space.Free + ReclaimedSpace));
+ m_LocationMap[Entry.Key] = Entry.Location;
}
- NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
}
-
- NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
- MovedBlockChunks.emplace(
- ChunkHash,
- BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment));
- WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
- }
- Chunk.clear();
- if (NewBlockFile)
- {
- NewBlockFile->Truncate(WriteOffset);
- NewBlockFile->Flush();
- NewBlockFile = {};
- }
-
- const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap);
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- UpdateLocations(LogEntries);
- m_ChunkBlocks[BlockIndex] = nullptr;
- }
- MovedCount += MovedBlockChunks.size();
- DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
- MovedBlockChunks.clear();
-
- ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath());
- std::error_code Ec;
- OldBlockFile->MarkAsDeleteOnClose(Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
- }
- OldBlockFile = nullptr;
- }
-
- for (const IoHash& ChunkHash : DeletedChunks)
- {
- DeletedSize += LocationMap[ChunkHash].GetSize();
- }
+ },
+ [&GcCtx]() { return GcCtx.CollectSmallObjects(); });
GcCtx.DeletedCas(DeletedChunks);
}
@@ -904,7 +540,7 @@ CasContainerStrategy::MakeIndexSnapshot()
ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName);
uint64_t EntryCount = 0;
Stopwatch Timer;
- const auto _ = MakeGuard([this, &EntryCount, &Timer] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
EntryCount,
@@ -935,7 +571,6 @@ CasContainerStrategy::MakeIndexSnapshot()
std::vector<CasDiskIndexEntry> Entries;
{
- RwLock::SharedLockScope __(m_InsertLock);
RwLock::SharedLockScope ___(m_LocationMapLock);
Entries.resize(m_LocationMap.size());
@@ -990,7 +625,7 @@ CasContainerStrategy::ReadIndexFile()
if (std::filesystem::is_regular_file(IndexPath))
{
Stopwatch Timer;
- const auto _ = MakeGuard([this, &Entries, &Timer] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' index containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
Entries.size(),
@@ -1043,7 +678,7 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
if (std::filesystem::is_regular_file(LogPath))
{
Stopwatch Timer;
- const auto _ = MakeGuard([this, &Entries, &Timer] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' log containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
Entries.size(),
@@ -1103,7 +738,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
uint32_t MigratedBlockCount = 0;
Stopwatch MigrationTimer;
uint64_t TotalSize = 0;
- const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
m_Config.RootDirectory / m_ContainerBaseName,
MigratedChunkCount,
@@ -1112,32 +747,13 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
NiceBytes(TotalSize));
});
- uint32_t WriteBlockIndex = 0;
- while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ uint64_t BlockFileSize = 0;
{
- ++WriteBlockIndex;
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+ BlockFileSize = BlockFile.FileSize();
}
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
- return 0;
- }
-
- if (Space.Free < m_MaxBlockSize)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize,
- NiceBytes(Space.Free));
- return 0;
- }
-
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
-
std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
uint64_t InvalidEntryCount = 0;
@@ -1145,7 +761,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
{
Stopwatch Timer;
- const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] {
+ const auto __ = MakeGuard([&] {
ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
LegacyDiskIndex.size(),
@@ -1173,7 +789,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
0);
std::vector<IoHash> BadEntries;
- uint64_t BlockFileSize = BlockFile.FileSize();
for (const auto& Entry : LegacyDiskIndex)
{
const LegacyCasDiskIndexEntry& Record(Entry.second);
@@ -1199,7 +814,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
if (LegacyDiskIndex.empty())
{
- BlockFile.Close();
LegacyCasLog.Close();
if (CleanSource)
{
@@ -1218,219 +832,75 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
return 0;
}
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
- TotalSize += Record.Location.GetSize();
- }
-
- uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size());
- uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize;
- if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- MaxRequiredBlockCount,
- BlockStoreDiskLocation::MaxBlockIndex);
- return 0;
- }
-
- constexpr const uint64_t DiskReserve = 1ul << 28;
-
- if (CleanSource)
- {
- if (Space.Free < (m_MaxBlockSize + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(m_MaxBlockSize + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
- else
- {
- if (Space.Free < (RequiredDiskSpace + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(RequiredDiskSpace + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
-
std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
CreateDirectories(LogPath.parent_path());
TCasLogFile<CasDiskIndexEntry> CasLog;
CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- if (CleanSource && (MaxRequiredBlockCount < 2))
- {
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(LegacyDiskIndex.size());
-
- // We can use the block as is, just move it and add the blocks to our new log
- for (auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
-
- BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()};
- BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
- }
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- CreateDirectories(BlockPath.parent_path());
- BlockFile.Close();
- std::filesystem::rename(LegacyDataPath, BlockPath);
- CasLog.Append(LogEntries);
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
- }
-
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
- }
- else
+ std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size());
+ ChunkLocations.reserve(LegacyDiskIndex.size());
+ for (const auto& Entry : LegacyDiskIndex)
{
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(LegacyDiskIndex.size());
- for (const auto& Entry : LegacyDiskIndex)
- {
- ChunkHashes.push_back(Entry.first);
- }
-
- std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
- auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
- auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
- return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset();
- });
-
- uint64_t BlockSize = 0;
- uint64_t BlockOffset = 0;
- std::vector<BlockStoreLocation> NewLocations;
- struct BlockData
- {
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- uint64_t BlockOffset;
- uint64_t BlockSize;
- uint32_t BlockIndex;
- };
-
- std::vector<BlockData> BlockRanges;
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- BlockRanges.reserve(MaxRequiredBlockCount);
- for (const IoHash& ChunkHash : ChunkHashes)
- {
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
- const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
-
- uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
- uint64_t ChunkSize = LegacyChunkLocation.GetSize();
- uint64_t ChunkEnd = ChunkOffset + ChunkSize;
-
- if (BlockSize == 0)
- {
- BlockOffset = ChunkOffset;
- }
- if ((ChunkEnd - BlockOffset) > m_MaxBlockSize)
- {
- BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
- BlockRange.Chunks.swap(Chunks);
- BlockRanges.push_back(BlockRange);
-
- WriteBlockIndex++;
- while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
- BlockOffset = ChunkOffset;
- BlockSize = 0;
- }
- BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
- BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
- Chunks.push_back({ChunkHash, ChunkLocation});
- BlockSize = ChunkEnd - BlockOffset;
- }
- if (BlockSize > 0)
- {
- BlockRanges.push_back(
- {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
- }
- Stopwatch WriteBlockTimer;
-
- std::reverse(BlockRanges.begin(), BlockRanges.end());
- std::vector<std::uint8_t> Buffer(1 << 28);
- for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
- {
- const BlockData& BlockRange = BlockRanges[Idx];
- if (Idx > 0)
- {
- uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
- uint64_t Completed = BlockOffset + BlockSize - Remaining;
- uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
-
- ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- Idx,
- BlockRanges.size(),
- NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
- NiceBytes(BlockOffset + BlockSize),
- NiceTimeSpanMs(ETA));
- }
-
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex);
- BlockStoreFile ChunkBlock(BlockPath);
- ChunkBlock.Create(BlockRange.BlockSize);
- uint64_t Offset = 0;
- while (Offset < BlockRange.BlockSize)
- {
- uint64_t Size = BlockRange.BlockSize - Offset;
- if (Size > Buffer.size())
- {
- Size = Buffer.size();
- }
- BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
- ChunkBlock.Write(Buffer.data(), Size, Offset);
- Offset += Size;
- }
- ChunkBlock.Truncate(Offset);
- ChunkBlock.Flush();
-
+ const LegacyCasDiskLocation& Location = Entry.second.Location;
+ const IoHash& ChunkHash = Entry.first;
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()});
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ TotalSize += Location.GetSize();
+ }
+ m_BlockStore.Split(
+ ChunkLocations,
+ LegacyDataPath,
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ BlockStoreDiskLocation::MaxBlockIndex + 1,
+ m_PayloadAlignment,
+ CleanSource,
+ [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
+ const BlockStore::MovedChunksArray& MovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
- BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags});
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = {NewLocation, m_PayloadAlignment},
+ .ContentType = OldEntry.ContentType,
+ .Flags = OldEntry.Flags});
}
- CasLog.Append(LogEntries);
for (const CasDiskIndexEntry& Entry : LogEntries)
{
m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
}
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
-
+ CasLog.Append(LogEntries);
+ CasLog.Flush();
if (CleanSource)
{
std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LegacyLogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone});
+ size_t ChunkIndex = Entry.first;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ LegacyLogEntries.push_back(
+ LegacyCasDiskIndexEntry{.Key = ChunkHash,
+ .Location = OldEntry.Location,
+ .ContentType = OldEntry.ContentType,
+ .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)});
}
LegacyCasLog.Append(LegacyLogEntries);
- BlockFile.SetFileSize(BlockRange.BlockOffset);
+ LegacyCasLog.Flush();
}
- }
- }
+ MigratedBlockCount++;
+ MigratedChunkCount += MovedChunks.size();
+ });
- BlockFile.Close();
LegacyCasLog.Close();
CasLog.Close();
@@ -1480,67 +950,16 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- std::unordered_set<uint32_t> KnownBlocks;
+ std::vector<BlockStoreLocation> KnownLocations;
+ KnownLocations.reserve(m_LocationMap.size());
for (const auto& Entry : m_LocationMap)
{
const BlockStoreDiskLocation& Location = Entry.second;
- m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_seq_cst);
- KnownBlocks.insert(Location.GetBlockIndex());
+ m_TotalSize.fetch_add(Location.GetSize(), std::memory_order::relaxed);
+ KnownLocations.push_back(Location.Get(m_PayloadAlignment));
}
- if (std::filesystem::is_directory(m_BlocksBasePath))
- {
- std::vector<std::filesystem::path> FoldersToScan;
- FoldersToScan.push_back(m_BlocksBasePath);
- size_t FolderOffset = 0;
- while (FolderOffset < FoldersToScan.size())
- {
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
- {
- if (Entry.is_directory())
- {
- FoldersToScan.push_back(Entry.path());
- continue;
- }
- if (Entry.is_regular_file())
- {
- const std::filesystem::path Path = Entry.path();
- if (Path.extension() != DataExtension)
- {
- continue;
- }
- std::string FileName = Path.stem().string();
- uint32_t BlockIndex;
- bool OK = ParseHexNumber(FileName, BlockIndex);
- if (!OK)
- {
- continue;
- }
- if (!KnownBlocks.contains(BlockIndex))
- {
- // Log removing unreferenced block
- // Clear out unused blocks
- ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path);
- std::error_code Ec;
- std::filesystem::remove(Path, Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
- }
- continue;
- }
- Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
- BlockFile->Open();
- m_ChunkBlocks[BlockIndex] = BlockFile;
- }
- }
- ++FolderOffset;
- }
- }
- else
- {
- CreateDirectories(m_BlocksBasePath);
- }
+ m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0))
{
@@ -2195,7 +1614,7 @@ TEST_CASE("compactcas.legacyconversion")
Gc.CollectGarbage(GcCtx);
}
- std::filesystem::path BlockPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
+ std::filesystem::path BlockPath = BlockStore::GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test");
std::filesystem::rename(BlockPath, LegacyDataPath);
@@ -2261,7 +1680,7 @@ TEST_CASE("compactcas.legacyconversion")
}
}
-TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
+TEST_CASE("compactcas.threadedinsert")
{
// for (uint32_t i = 0; i < 100; ++i)
{
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 11da37202..114a6a48c 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -78,17 +78,12 @@ private:
TCasLogFile<CasDiskIndexEntry> m_CasLog;
std::string m_ContainerBaseName;
std::filesystem::path m_BlocksBasePath;
+ BlockStore m_BlockStore;
RwLock m_LocationMapLock;
typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t;
LocationMap_t m_LocationMap;
- std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
- RwLock m_InsertLock; // used to serialize inserts
- Ref<BlockStoreFile> m_WriteBlock;
- std::uint64_t m_CurrentInsertOffset = 0;
-
- std::atomic_uint32_t m_WriteBlockIndex{};
std::atomic_uint64_t m_TotalSize{};
};
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index b53cfaa54..d074a906f 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -92,7 +92,7 @@ FileCasStrategy::Initialize(bool IsNewStore)
m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
Stopwatch Timer;
- const auto _ = MakeGuard([this, &Timer] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
});
@@ -692,7 +692,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
Stopwatch TotalTimer;
- const auto _ = MakeGuard([this, &TotalTimer, &DeletedCount, &ChunkCount, OldTotalSize] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
m_Config.RootDirectory,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 856f9af02..4b50668d9 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -76,7 +76,7 @@ namespace {
return MakeErrorCodeFromLastError();
}
bool Keep = true;
- auto _ = MakeGuard([FileHandle, &Keep, Path]() {
+ auto _ = MakeGuard([&]() {
::CloseHandle(FileHandle);
if (!Keep)
{
@@ -105,7 +105,7 @@ namespace {
}
bool Keep = true;
- auto _ = MakeGuard([Fd, &Keep, Path]() {
+ auto _ = MakeGuard([&]() {
close(Fd);
if (!Keep)
{
@@ -212,9 +212,8 @@ GcContext::ContributeCas(std::span<const IoHash> Cas)
}
void
-GcContext::ContributeCacheKeys(const std::string& Bucket, std::vector<IoHash> ValidKeys, std::vector<IoHash> ExpiredKeys)
+GcContext::ContributeCacheKeys(const std::string& Bucket, std::vector<IoHash>&& ExpiredKeys)
{
- m_State->m_CacheBuckets[Bucket].ValidKeys = std::move(ValidKeys);
m_State->m_CacheBuckets[Bucket].ExpiredKeys = std::move(ExpiredKeys);
}
@@ -255,12 +254,6 @@ GcContext::DeletedCas()
}
std::span<const IoHash>
-GcContext::ValidCacheKeys(const std::string& Bucket) const
-{
- return m_State->m_CacheBuckets[Bucket].ValidKeys;
-}
-
-std::span<const IoHash>
GcContext::ExpiredCacheKeys(const std::string& Bucket) const
{
return m_State->m_CacheBuckets[Bucket].ExpiredKeys;
@@ -399,7 +392,7 @@ CasGc::CollectGarbage(GcContext& GcCtx)
// First gather reference set
{
Stopwatch Timer;
- const auto Guard = MakeGuard([this, &Timer] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
for (GcContributor* Contributor : m_GcContribs)
{
Contributor->GatherReferences(GcCtx);
@@ -440,7 +433,7 @@ CasGc::CollectGarbage(GcContext& GcCtx)
{
Stopwatch Timer;
- const auto Guard = MakeGuard([this, &Timer] { ZEN_INFO("collected garbage in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ const auto Guard = MakeGuard([&] { ZEN_INFO("collected garbage in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
for (GcStorage* Storage : m_GcStorage)
{
Storage->CollectGarbage(GcCtx);
@@ -452,8 +445,7 @@ CasGc::CollectGarbage(GcContext& GcCtx)
if (CidStore* CidStore = m_CidStore)
{
Stopwatch Timer;
- const auto Guard =
- MakeGuard([this, &Timer] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ const auto Guard = MakeGuard([&] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
CidStore->RemoveCids(GcCtx.DeletedCas());
}
}
@@ -679,8 +671,7 @@ GcScheduler::SchedulerThread()
NiceTimeSpanMs(uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(GcCtx.MaxCacheDuration()).count())));
{
Stopwatch Timer;
- const auto __ =
- MakeGuard([this, &Timer] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
m_CasGc.CollectGarbage(GcCtx);
diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h
index 5a500c65f..ce9988776 100644
--- a/zenstore/include/zenstore/basicfile.h
+++ b/zenstore/include/zenstore/basicfile.h
@@ -33,11 +33,12 @@ public:
enum class Mode : uint32_t
{
- kRead = 0, // Opens a existing file for read only
- kWrite = 1, // Opens (or creates) a file for read and write
- kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero
- kDelete = 3, // Opens (or creates) a file for read and write enabling MarkAsDeleteOnClose()
- kTruncateDelete = 4 // Opens (or creates) a file for read and write and sets the size to zero enabling MarkAsDeleteOnClose()
+ kRead = 0, // Opens a existing file for read only
+ kWrite = 1, // Opens (or creates) a file for read and write
+ kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero
+ kDelete = 3, // Opens (or creates) a file for read and write allowing .DeleteFile file disposition to be set
+ kTruncateDelete =
+ 4 // Opens (or creates) a file for read and write and sets the size to zero allowing .DeleteFile file disposition to be set
};
void Open(const std::filesystem::path& FileName, Mode Mode);
@@ -55,7 +56,6 @@ public:
void SetFileSize(uint64_t FileSize);
IoBuffer ReadAll();
void WriteAll(IoBuffer Data, std::error_code& Ec);
- void MarkAsDeleteOnClose(std::error_code& Ec);
void* Detach();
inline void* Handle() { return m_FileHandle; }
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index 424db461a..34c475fb6 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -6,6 +6,9 @@
#include <zencore/zencore.h>
#include <zenstore/basicfile.h>
+#include <unordered_map>
+#include <unordered_set>
+
namespace zen {
//////////////////////////////////////////////////////////////////////////
@@ -15,6 +18,8 @@ struct BlockStoreLocation
uint32_t BlockIndex;
uint64_t Offset;
uint64_t Size;
+
+ inline auto operator<=>(const BlockStoreLocation& Rhs) const = default;
};
#pragma pack(push)
@@ -84,13 +89,14 @@ struct BlockStoreFile : public RefCounted
const std::filesystem::path& GetPath() const;
void Open();
void Create(uint64_t InitialSize);
- void MarkAsDeleteOnClose(std::error_code& Ec);
+ void MarkAsDeleteOnClose();
uint64_t FileSize();
IoBuffer GetChunk(uint64_t Offset, uint64_t Size);
void Read(void* Data, uint64_t Size, uint64_t FileOffset);
void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
void Truncate(uint64_t Size);
void Flush();
+ BasicFile& GetBasicFile();
void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
private:
@@ -99,6 +105,77 @@ private:
BasicFile m_File;
};
+class BlockStore
+{
+public:
+ struct ReclaimSnapshotState
+ {
+ std::unordered_set<uint32_t> m_ActiveWriteBlocks;
+ size_t BlockCount;
+ };
+
+ typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray;
+ typedef std::vector<size_t> ChunkIndexArray;
+
+ typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback;
+ typedef std::function<uint64_t()> ClaimDiskReserveCallback;
+ typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
+ typedef std::function<void(size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size)>
+ IterateChunksLargeSizeCallback;
+ typedef std::function<void(const MovedChunksArray& MovedChunks)> SplitCallback;
+ typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
+
+ void Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations);
+ void Close();
+
+ void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback);
+
+ IoBuffer TryGetChunk(const BlockStoreLocation& Location);
+ void Flush();
+
+ ReclaimSnapshotState GetReclaimSnapshotState();
+ void ReclaimSpace(
+ const ReclaimSnapshotState& Snapshot,
+ const std::vector<BlockStoreLocation>& ChunkLocations,
+ const ChunkIndexArray& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& ChangeCallback = [](const MovedChunksArray&, const ChunkIndexArray&) {},
+ const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; });
+
+ void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
+ IterateChunksSmallSizeCallback SmallSizeCallback,
+ IterateChunksLargeSizeCallback LargeSizeCallback);
+
+ static bool Split(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::filesystem::path& SourceBlockFilePath,
+ const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ size_t PayloadAlignment,
+ bool CleanSource,
+ const SplitCallback& Callback);
+
+ static const char* GetBlockFileExtension();
+ static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);
+
+private:
+ std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
+
+ RwLock m_InsertLock; // used to serialize inserts
+ Ref<BlockStoreFile> m_WriteBlock;
+ std::uint64_t m_CurrentInsertOffset = 0;
+ std::atomic_uint32_t m_WriteBlockIndex{};
+ std::vector<uint32_t> m_ActiveWriteBlocks;
+
+ uint64_t m_MaxBlockSize = 1u << 28;
+ uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1;
+ std::filesystem::path m_BlocksBasePath;
+};
+
void blockstore_forcelink();
} // namespace zen
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index bc8dee9a3..6268588ec 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -53,7 +53,7 @@ public:
void ContributeCids(std::span<const IoHash> Cid);
void ContributeCas(std::span<const IoHash> Hash);
- void ContributeCacheKeys(const std::string& Bucket, std::vector<IoHash> ValidKeys, std::vector<IoHash> ExpiredKeys);
+ void ContributeCacheKeys(const std::string& Bucket, std::vector<IoHash>&& ExpiredKeys);
void IterateCids(std::function<void(const IoHash&)> Callback);
@@ -64,7 +64,6 @@ public:
void DeletedCas(std::span<const IoHash> Cas);
CasChunkSet& DeletedCas();
- std::span<const IoHash> ValidCacheKeys(const std::string& Bucket) const;
std::span<const IoHash> ExpiredCacheKeys(const std::string& Bucket) const;
bool IsDeletionMode() const;