aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-03 22:23:26 +0200
committerGitHub <[email protected]>2022-05-03 22:23:26 +0200
commitc5b2435192f382fbaa39a8ff67de16ee3b69b7a6 (patch)
tree294bb596d61582744dd7901f6a464c324bdec3d2 /zenstore/compactcas.cpp
parentMerge pull request #84 from EpicGames/de/cleanup-lock-sharding-in-iobuffer (diff)
parentmacos compilation fix (diff)
downloadzen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.tar.xz
zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.zip
Merge pull request #86 from EpicGames/de/block-store-refactor
structured cache with block store
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp996
1 files changed, 213 insertions, 783 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 920ed965f..7cc742beb 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -48,25 +48,8 @@ struct CasDiskIndexHeader
static_assert(sizeof(CasDiskIndexHeader) == 32);
namespace {
- std::vector<CasDiskIndexEntry> MakeCasDiskEntries(const std::unordered_map<IoHash, BlockStoreDiskLocation>& MovedChunks,
- const std::vector<IoHash>& DeletedChunks)
- {
- std::vector<CasDiskIndexEntry> result;
- result.reserve(MovedChunks.size());
- for (const auto& MovedEntry : MovedChunks)
- {
- result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second});
- }
- for (const IoHash& ChunkHash : DeletedChunks)
- {
- result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone});
- }
- return result;
- }
-
const char* IndexExtension = ".uidx";
const char* LogExtension = ".ulog";
- const char* DataExtension = ".ucas";
std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
{
@@ -93,22 +76,6 @@ namespace {
return GetBasePath(RootPath, ContainerBaseName) / "blocks";
}
- std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
- {
- ExtendablePathBuilder<256> Path;
-
- char BlockHexString[9];
- ToHexNumber(BlockIndex, BlockHexString);
-
- Path.Append(BlocksBasePath);
- Path.AppendSeparator();
- Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
- Path.AppendSeparator();
- Path.Append(BlockHexString);
- Path.Append(DataExtension);
- return Path.ToPath();
- }
-
std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
{
return RootPath / (ContainerBaseName + LogExtension);
@@ -116,7 +83,7 @@ namespace {
std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
{
- return RootPath / (ContainerBaseName + DataExtension);
+ return RootPath / (ContainerBaseName + ".ucas");
}
std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
@@ -263,53 +230,12 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3
CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
- uint32_t WriteBlockIndex;
- Ref<BlockStoreFile> WriteBlock;
- uint64_t InsertOffset;
{
- RwLock::ExclusiveLockScope _(m_InsertLock);
-
- {
- RwLock::SharedLockScope __(m_LocationMapLock);
- if (m_LocationMap.contains(ChunkHash))
- {
- return CasStore::InsertResult{.New = false};
- }
- }
-
- // New entry
-
- WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- bool IsWriting = m_WriteBlock != nullptr;
- if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ if (m_LocationMap.contains(ChunkHash))
{
- if (m_WriteBlock)
- {
- m_WriteBlock = nullptr;
- }
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
- {
- throw std::runtime_error(
- fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName));
- }
- WriteBlockIndex += IsWriting ? 1 : 0;
- while (m_ChunkBlocks.contains(WriteBlockIndex))
- {
- WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- }
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- m_WriteBlock = new BlockStoreFile(BlockPath);
- m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
- }
- m_CurrentInsertOffset = 0;
- m_WriteBlock->Create(m_MaxBlockSize);
+ return CasStore::InsertResult{.New = false};
}
- InsertOffset = m_CurrentInsertOffset;
- m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment);
- WriteBlock = m_WriteBlock;
}
// We can end up in a situation that InsertChunk writes the same chunk data in
@@ -324,17 +250,15 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// This should be a rare occasion and the current flow reduces the time we block for
// reads, insert and GC.
- BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
-
- WriteBlock->Write(ChunkData, ChunkSize, InsertOffset);
+ BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment);
+ BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
m_CasLog.Append(IndexEntry);
-
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
{
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, Location);
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, DiskLocation);
}
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
return CasStore::InsertResult{.New = true};
}
@@ -348,20 +272,21 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
- Ref<BlockStoreFile> ChunkBlock;
- BlockStoreLocation Location;
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
{
- RwLock::SharedLockScope _(m_LocationMapLock);
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
- {
- Location = KeyIt->second.Get(m_PayloadAlignment);
- ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
- }
- else
- {
- return IoBuffer();
- }
+ return IoBuffer();
+ }
+ BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment);
+ _.ReleaseNow();
+
+ Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location);
+ if (!ChunkBlock)
+ {
+ return IoBuffer();
}
+
return ChunkBlock->GetChunk(Location.Offset, Location.Size);
}
@@ -388,128 +313,94 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
CasContainerStrategy::Flush()
{
- {
- RwLock::ExclusiveLockScope _(m_InsertLock);
- if (m_CurrentInsertOffset > 0)
- {
- uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- m_WriteBlock = nullptr;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
- m_CurrentInsertOffset = 0;
- }
- }
+ m_BlockStore.Flush();
MakeIndexSnapshot();
}
void
CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
- std::vector<CasDiskIndexEntry> BadChunks;
-
- // We do a read sweep through the payloads file and validate
- // any entries that are contained within each segment, with
- // the assumption that most entries will be checked in this
- // pass. An alternative strategy would be to use memory mapping.
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ uint64_t TotalChunkCount = m_LocationMap.size();
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
{
- std::vector<CasDiskIndexEntry> BigChunks;
- const uint64_t WindowSize = 4 * 1024 * 1024;
- IoBuffer ReadBuffer{WindowSize};
- void* BufferBase = ReadBuffer.MutableData();
-
- RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time?
- RwLock::SharedLockScope __(m_LocationMapLock);
-
- for (const auto& Block : m_ChunkBlocks)
+ for (const auto& Entry : m_LocationMap)
{
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
- const Ref<BlockStoreFile>& BlockFile = Block.second;
- BlockFile->Open();
- const uint64_t FileSize = BlockFile->FileSize();
-
- do
- {
- const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
- BlockFile->Read(BufferBase, ChunkSize, WindowStart);
-
- for (auto& Entry : m_LocationMap)
- {
- const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment);
- const uint64_t EntryOffset = Location.Offset;
+ const IoHash& ChunkHash = Entry.first;
+ const BlockStoreDiskLocation& DiskLocation = Entry.second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
- if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
- {
- const uint64_t EntryEnd = EntryOffset + Location.Size;
-
- if (EntryEnd >= WindowEnd)
- {
- BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
-
- continue;
- }
-
- const IoHash ComputedHash =
- IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, Location.Size);
-
- if (Entry.first != ComputedHash)
- {
- // Hash mismatch
- BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone});
- }
- }
- }
-
- WindowStart += WindowSize;
- WindowEnd += WindowSize;
- } while (WindowStart < FileSize);
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
}
+ }
- // Deal with large chunks
-
- for (const CasDiskIndexEntry& Entry : BigChunks)
- {
- IoHashStream Hasher;
- const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment);
- const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex];
- BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
+ std::vector<IoHash> BadKeys;
- if (Entry.Key != ComputedHash)
+ m_BlockStore.IterateChunks(
+ ChunkLocations,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ const IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
+ const IoHash& ExpectedHash = ChunkIndexToChunkHash[ChunkIndex];
+ if (ComputedHash != ExpectedHash)
{
- BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone});
+ // Hash mismatch
+ BadKeys.push_back(ExpectedHash);
}
- }
- }
+ },
+ [&](size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size) {
+ IoHashStream Hasher;
+ BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+ const IoHash& ExpectedHash = ChunkIndexToChunkHash[ChunkIndex];
+ if (ComputedHash != ExpectedHash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(ExpectedHash);
+ }
+ });
- if (BadChunks.empty())
+ if (BadKeys.empty())
{
return;
}
- ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_Config.RootDirectory / m_ContainerBaseName);
+ ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_Config.RootDirectory / m_ContainerBaseName);
- // Deal with bad chunks by removing them from our lookup map
+ _.ReleaseNow();
- std::vector<IoHash> BadChunkHashes;
- BadChunkHashes.reserve(BadChunks.size());
-
- m_CasLog.Append(BadChunks);
+ if (Ctx.RunRecovery())
{
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- for (const CasDiskIndexEntry& Entry : BadChunks)
+ // Deal with bad chunks by removing them from our lookup map
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
{
- BadChunkHashes.push_back(Entry.Key);
- m_LocationMap.erase(Entry.Key);
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ for (const IoHash& ChunkHash : BadKeys)
+ {
+ const auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
+ {
+ // Might have been GC'd
+ continue;
+ }
+ LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
+ m_LocationMap.erase(KeyIt);
+ }
}
+ m_CasLog.Append(LogEntries);
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
-
- Ctx.ReportBadCasChunks(BadChunkHashes);
+ Ctx.ReportBadCasChunks(BadKeys);
}
void
@@ -533,93 +424,33 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// We update the index as we complete each new block file. This makes it possible
// to break the GC if we want to limit time for execution.
//
- // GC can fairly parallell to regular operation - it will block while taking
- // a snapshot of the current m_LocationMap state.
- //
- // While moving blocks it will do a blocking operation and update the m_LocationMap
- // after each new block is written and figuring out the path to the next new block.
+ // GC can very parallell to regular operation - it will block while taking
+ // a snapshot of the current m_LocationMap state and while moving blocks it will
+ // do a blocking operation and update the m_LocationMap after each new block is
+ // written and figuring out the path to the next new block.
ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+
uint64_t WriteBlockTimeUs = 0;
uint64_t WriteBlockLongestTimeUs = 0;
uint64_t ReadBlockTimeUs = 0;
uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
- std::vector<IoHash> DeletedChunks;
- uint64_t MovedCount = 0;
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([this,
- &TotalTimer,
- &WriteBlockTimeUs,
- &WriteBlockLongestTimeUs,
- &ReadBlockTimeUs,
- &ReadBlockLongestTimeUs,
- &TotalChunkCount,
- &DeletedChunks,
- &MovedCount,
- &DeletedSize,
- OldTotalSize] {
- ZEN_INFO(
- "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
- "#{} "
- "of #{} "
- "chunks ({}).",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedChunks.size(),
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize));
- });
-
- LocationMap_t LocationMap;
- size_t BlockCount;
- uint64_t ExcludeBlockIndex = 0x800000000ull;
+ LocationMap_t LocationMap;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
{
- RwLock::SharedLockScope __(m_InsertLock);
RwLock::SharedLockScope ___(m_LocationMapLock);
- {
- Stopwatch Timer;
- const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_WriteBlock)
- {
- ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- }
- __.ReleaseNow();
- }
- LocationMap = m_LocationMap;
- BlockCount = m_ChunkBlocks.size();
- }
-
- if (LocationMap.empty())
- {
- ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName);
- return;
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ LocationMap = m_LocationMap;
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
}
- TotalChunkCount = LocationMap.size();
-
- std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
- std::vector<std::vector<IoHash>> KeepChunks;
- std::vector<std::vector<IoHash>> DeleteChunks;
-
- BlockIndexToChunkMapIndex.reserve(BlockCount);
- KeepChunks.reserve(BlockCount);
- DeleteChunks.reserve(BlockCount);
- size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
+ uint64_t TotalChunkCount = LocationMap.size();
std::vector<IoHash> TotalChunkHashes;
TotalChunkHashes.reserve(TotalChunkCount);
@@ -628,272 +459,82 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
TotalChunkHashes.push_back(Entry.first);
}
- uint64_t DeleteCount = 0;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ BlockStore::ChunkIndexArray KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
- uint64_t NewTotalSize = 0;
GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- auto KeyIt = LocationMap.find(ChunkHash);
- const BlockStoreDiskLocation& Location = KeyIt->second;
- uint32_t BlockIndex = Location.GetBlockIndex();
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreDiskLocation& DiskLocation = KeyIt->second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
- if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex)
- {
- return;
- }
-
- auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex);
- size_t ChunkMapIndex = 0;
- if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
- {
- ChunkMapIndex = KeepChunks.size();
- BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex;
- KeepChunks.resize(ChunkMapIndex + 1);
- KeepChunks.back().reserve(GuesstimateCountPerBlock);
- DeleteChunks.resize(ChunkMapIndex + 1);
- DeleteChunks.back().reserve(GuesstimateCountPerBlock);
- }
- else
- {
- ChunkMapIndex = BlockIndexPtr->second;
- }
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
if (Keep)
{
- std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex];
- ChunkMap.push_back(ChunkHash);
- NewTotalSize += Location.GetSize();
- }
- else
- {
- std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
- ChunkMap.push_back(ChunkHash);
- DeleteCount++;
+ KeepChunkIndexes.push_back(ChunkIndex);
}
});
- std::unordered_set<uint32_t> BlocksToReWrite;
- BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
- for (const auto& Entry : BlockIndexToChunkMapIndex)
- {
- uint32_t BlockIndex = Entry.first;
- size_t ChunkMapIndex = Entry.second;
- const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
- if (ChunkMap.empty())
- {
- continue;
- }
- BlocksToReWrite.insert(BlockIndex);
- }
-
const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
if (!PerformDelete)
{
- uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
- ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- DeleteCount,
- NiceBytes(TotalSize - NewTotalSize),
- TotalChunkCount,
- NiceBytes(TotalSize));
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
return;
}
- // Move all chunks in blocks that have chunks removed to new blocks
-
- Ref<BlockStoreFile> NewBlockFile;
- uint64_t WriteOffset = 0;
- uint32_t NewBlockIndex = 0;
- DeletedChunks.reserve(DeleteCount);
-
- auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) {
- for (const CasDiskIndexEntry& Entry : Entries)
- {
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ std::vector<IoHash> DeletedChunks;
+ m_BlockStore.ReclaimSpace(
+ BlockStoreState,
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_PayloadAlignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- auto KeyIt = m_LocationMap.find(Entry.Key);
- uint64_t ChunkSize = KeyIt->second.GetSize();
- m_TotalSize.fetch_sub(ChunkSize);
- m_LocationMap.erase(KeyIt);
- continue;
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash];
+ LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone});
+ DeletedChunks.push_back(ChunkHash);
}
- m_LocationMap[Entry.Key] = Entry.Location;
- }
- };
-
- std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks;
- for (uint32_t BlockIndex : BlocksToReWrite)
- {
- const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
-
- Ref<BlockStoreFile> OldBlockFile;
- {
- RwLock::SharedLockScope _i(m_LocationMapLock);
- OldBlockFile = m_ChunkBlocks[BlockIndex];
- }
- const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex];
- if (KeepMap.empty())
- {
- const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap);
m_CasLog.Append(LogEntries);
m_CasLog.Flush();
{
- RwLock::ExclusiveLockScope _i(m_LocationMapLock);
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
Stopwatch Timer;
- const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ const auto ____ = MakeGuard([&] {
uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
- UpdateLocations(LogEntries);
- m_ChunkBlocks[BlockIndex] = nullptr;
- }
- DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
- ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'",
- m_ContainerBaseName,
- BlockIndex,
- OldBlockFile->GetPath());
- std::error_code Ec;
- OldBlockFile->MarkAsDeleteOnClose(Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
- }
- continue;
- }
-
- std::vector<uint8_t> Chunk;
- for (const IoHash& ChunkHash : KeepMap)
- {
- auto KeyIt = LocationMap.find(ChunkHash);
- const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
- Chunk.resize(ChunkLocation.Size);
- OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
-
- if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
- {
- uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {});
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
-
- if (NewBlockFile)
- {
- NewBlockFile->Truncate(WriteOffset);
- NewBlockFile->Flush();
- }
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- UpdateLocations(LogEntries);
- if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
- m_Config.RootDirectory / m_ContainerBaseName,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
- return;
- }
- while (m_ChunkBlocks.contains(NextBlockIndex))
- {
- NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- }
- std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
- NewBlockFile = new BlockStoreFile(NewBlockPath);
- m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
- }
-
- MovedCount += MovedBlockChunks.size();
- MovedBlockChunks.clear();
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
+ for (const CasDiskIndexEntry& Entry : LogEntries)
{
- ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
- return;
- }
- if (Space.Free < m_MaxBlockSize)
- {
- uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve();
- if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
{
- ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize,
- NiceBytes(Space.Free + ReclaimedSpace));
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- m_ChunkBlocks.erase(NextBlockIndex);
- return;
+ m_LocationMap.erase(Entry.Key);
+ uint64_t ChunkSize = Entry.Location.GetSize();
+ m_TotalSize.fetch_sub(ChunkSize);
+ continue;
}
-
- ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- ReclaimedSpace,
- NiceBytes(Space.Free + ReclaimedSpace));
+ m_LocationMap[Entry.Key] = Entry.Location;
}
- NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
}
-
- NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
- MovedBlockChunks.emplace(
- ChunkHash,
- BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment));
- WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
- }
- Chunk.clear();
- if (NewBlockFile)
- {
- NewBlockFile->Truncate(WriteOffset);
- NewBlockFile->Flush();
- NewBlockFile = {};
- }
-
- const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
- std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap);
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
- {
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- UpdateLocations(LogEntries);
- m_ChunkBlocks[BlockIndex] = nullptr;
- }
- MovedCount += MovedBlockChunks.size();
- DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
- MovedBlockChunks.clear();
-
- ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath());
- std::error_code Ec;
- OldBlockFile->MarkAsDeleteOnClose(Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
- }
- OldBlockFile = nullptr;
- }
-
- for (const IoHash& ChunkHash : DeletedChunks)
- {
- DeletedSize += LocationMap[ChunkHash].GetSize();
- }
+ },
+ [&GcCtx]() { return GcCtx.CollectSmallObjects(); });
GcCtx.DeletedCas(DeletedChunks);
}
@@ -904,7 +545,7 @@ CasContainerStrategy::MakeIndexSnapshot()
ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName);
uint64_t EntryCount = 0;
Stopwatch Timer;
- const auto _ = MakeGuard([this, &EntryCount, &Timer] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
EntryCount,
@@ -935,7 +576,6 @@ CasContainerStrategy::MakeIndexSnapshot()
std::vector<CasDiskIndexEntry> Entries;
{
- RwLock::SharedLockScope __(m_InsertLock);
RwLock::SharedLockScope ___(m_LocationMapLock);
Entries.resize(m_LocationMap.size());
@@ -990,7 +630,7 @@ CasContainerStrategy::ReadIndexFile()
if (std::filesystem::is_regular_file(IndexPath))
{
Stopwatch Timer;
- const auto _ = MakeGuard([this, &Entries, &Timer] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' index containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
Entries.size(),
@@ -1043,7 +683,7 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
if (std::filesystem::is_regular_file(LogPath))
{
Stopwatch Timer;
- const auto _ = MakeGuard([this, &Entries, &Timer] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' log containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
Entries.size(),
@@ -1103,7 +743,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
uint32_t MigratedBlockCount = 0;
Stopwatch MigrationTimer;
uint64_t TotalSize = 0;
- const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] {
+ const auto _ = MakeGuard([&] {
ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
m_Config.RootDirectory / m_ContainerBaseName,
MigratedChunkCount,
@@ -1112,32 +752,13 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
NiceBytes(TotalSize));
});
- uint32_t WriteBlockIndex = 0;
- while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ uint64_t BlockFileSize = 0;
{
- ++WriteBlockIndex;
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+ BlockFileSize = BlockFile.FileSize();
}
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
- return 0;
- }
-
- if (Space.Free < m_MaxBlockSize)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize,
- NiceBytes(Space.Free));
- return 0;
- }
-
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
-
std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
uint64_t InvalidEntryCount = 0;
@@ -1145,7 +766,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
{
Stopwatch Timer;
- const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] {
+ const auto __ = MakeGuard([&] {
ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
LegacyDiskIndex.size(),
@@ -1173,7 +794,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
0);
std::vector<IoHash> BadEntries;
- uint64_t BlockFileSize = BlockFile.FileSize();
for (const auto& Entry : LegacyDiskIndex)
{
const LegacyCasDiskIndexEntry& Record(Entry.second);
@@ -1199,7 +819,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
if (LegacyDiskIndex.empty())
{
- BlockFile.Close();
LegacyCasLog.Close();
if (CleanSource)
{
@@ -1218,219 +837,75 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
return 0;
}
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
- TotalSize += Record.Location.GetSize();
- }
-
- uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size());
- uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize;
- if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- MaxRequiredBlockCount,
- BlockStoreDiskLocation::MaxBlockIndex);
- return 0;
- }
-
- constexpr const uint64_t DiskReserve = 1ul << 28;
-
- if (CleanSource)
- {
- if (Space.Free < (m_MaxBlockSize + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(m_MaxBlockSize + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
- else
- {
- if (Space.Free < (RequiredDiskSpace + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(RequiredDiskSpace + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
-
std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
CreateDirectories(LogPath.parent_path());
TCasLogFile<CasDiskIndexEntry> CasLog;
CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- if (CleanSource && (MaxRequiredBlockCount < 2))
- {
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(LegacyDiskIndex.size());
-
- // We can use the block as is, just move it and add the blocks to our new log
- for (auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
-
- BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()};
- BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
- }
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- CreateDirectories(BlockPath.parent_path());
- BlockFile.Close();
- std::filesystem::rename(LegacyDataPath, BlockPath);
- CasLog.Append(LogEntries);
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
- }
-
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
- }
- else
+ std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size());
+ ChunkLocations.reserve(LegacyDiskIndex.size());
+ for (const auto& Entry : LegacyDiskIndex)
{
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(LegacyDiskIndex.size());
- for (const auto& Entry : LegacyDiskIndex)
- {
- ChunkHashes.push_back(Entry.first);
- }
-
- std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
- auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
- auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
- return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset();
- });
-
- uint64_t BlockSize = 0;
- uint64_t BlockOffset = 0;
- std::vector<BlockStoreLocation> NewLocations;
- struct BlockData
- {
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- uint64_t BlockOffset;
- uint64_t BlockSize;
- uint32_t BlockIndex;
- };
-
- std::vector<BlockData> BlockRanges;
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- BlockRanges.reserve(MaxRequiredBlockCount);
- for (const IoHash& ChunkHash : ChunkHashes)
- {
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
- const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
-
- uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
- uint64_t ChunkSize = LegacyChunkLocation.GetSize();
- uint64_t ChunkEnd = ChunkOffset + ChunkSize;
-
- if (BlockSize == 0)
- {
- BlockOffset = ChunkOffset;
- }
- if ((ChunkEnd - BlockOffset) > m_MaxBlockSize)
- {
- BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
- BlockRange.Chunks.swap(Chunks);
- BlockRanges.push_back(BlockRange);
-
- WriteBlockIndex++;
- while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
- BlockOffset = ChunkOffset;
- BlockSize = 0;
- }
- BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
- BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
- Chunks.push_back({ChunkHash, ChunkLocation});
- BlockSize = ChunkEnd - BlockOffset;
- }
- if (BlockSize > 0)
- {
- BlockRanges.push_back(
- {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
- }
- Stopwatch WriteBlockTimer;
-
- std::reverse(BlockRanges.begin(), BlockRanges.end());
- std::vector<std::uint8_t> Buffer(1 << 28);
- for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
- {
- const BlockData& BlockRange = BlockRanges[Idx];
- if (Idx > 0)
- {
- uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
- uint64_t Completed = BlockOffset + BlockSize - Remaining;
- uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
-
- ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- Idx,
- BlockRanges.size(),
- NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
- NiceBytes(BlockOffset + BlockSize),
- NiceTimeSpanMs(ETA));
- }
-
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex);
- BlockStoreFile ChunkBlock(BlockPath);
- ChunkBlock.Create(BlockRange.BlockSize);
- uint64_t Offset = 0;
- while (Offset < BlockRange.BlockSize)
- {
- uint64_t Size = BlockRange.BlockSize - Offset;
- if (Size > Buffer.size())
- {
- Size = Buffer.size();
- }
- BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
- ChunkBlock.Write(Buffer.data(), Size, Offset);
- Offset += Size;
- }
- ChunkBlock.Truncate(Offset);
- ChunkBlock.Flush();
-
+ const LegacyCasDiskLocation& Location = Entry.second.Location;
+ const IoHash& ChunkHash = Entry.first;
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()});
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ TotalSize += Location.GetSize();
+ }
+ m_BlockStore.Split(
+ ChunkLocations,
+ LegacyDataPath,
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ BlockStoreDiskLocation::MaxBlockIndex + 1,
+ m_PayloadAlignment,
+ CleanSource,
+ [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
+ const BlockStore::MovedChunksArray& MovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
- BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags});
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = {NewLocation, m_PayloadAlignment},
+ .ContentType = OldEntry.ContentType,
+ .Flags = OldEntry.Flags});
}
- CasLog.Append(LogEntries);
for (const CasDiskIndexEntry& Entry : LogEntries)
{
m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
}
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
-
+ CasLog.Append(LogEntries);
+ CasLog.Flush();
if (CleanSource)
{
std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LegacyLogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone});
+ size_t ChunkIndex = Entry.first;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ LegacyLogEntries.push_back(
+ LegacyCasDiskIndexEntry{.Key = ChunkHash,
+ .Location = OldEntry.Location,
+ .ContentType = OldEntry.ContentType,
+ .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)});
}
LegacyCasLog.Append(LegacyLogEntries);
- BlockFile.SetFileSize(BlockRange.BlockOffset);
+ LegacyCasLog.Flush();
}
- }
- }
+ MigratedBlockCount++;
+ MigratedChunkCount += MovedChunks.size();
+ });
- BlockFile.Close();
LegacyCasLog.Close();
CasLog.Close();
@@ -1480,67 +955,16 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- std::unordered_set<uint32_t> KnownBlocks;
+ std::vector<BlockStoreLocation> KnownLocations;
+ KnownLocations.reserve(m_LocationMap.size());
for (const auto& Entry : m_LocationMap)
{
const BlockStoreDiskLocation& Location = Entry.second;
- m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_seq_cst);
- KnownBlocks.insert(Location.GetBlockIndex());
+ m_TotalSize.fetch_add(Location.GetSize(), std::memory_order::relaxed);
+ KnownLocations.push_back(Location.Get(m_PayloadAlignment));
}
- if (std::filesystem::is_directory(m_BlocksBasePath))
- {
- std::vector<std::filesystem::path> FoldersToScan;
- FoldersToScan.push_back(m_BlocksBasePath);
- size_t FolderOffset = 0;
- while (FolderOffset < FoldersToScan.size())
- {
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
- {
- if (Entry.is_directory())
- {
- FoldersToScan.push_back(Entry.path());
- continue;
- }
- if (Entry.is_regular_file())
- {
- const std::filesystem::path Path = Entry.path();
- if (Path.extension() != DataExtension)
- {
- continue;
- }
- std::string FileName = Path.stem().string();
- uint32_t BlockIndex;
- bool OK = ParseHexNumber(FileName, BlockIndex);
- if (!OK)
- {
- continue;
- }
- if (!KnownBlocks.contains(BlockIndex))
- {
- // Log removing unreferenced block
- // Clear out unused blocks
- ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path);
- std::error_code Ec;
- std::filesystem::remove(Path, Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
- }
- continue;
- }
- Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
- BlockFile->Open();
- m_ChunkBlocks[BlockIndex] = BlockFile;
- }
- }
- ++FolderOffset;
- }
- }
- else
- {
- CreateDirectories(m_BlocksBasePath);
- }
+ m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0))
{
@@ -2195,7 +1619,7 @@ TEST_CASE("compactcas.legacyconversion")
Gc.CollectGarbage(GcCtx);
}
- std::filesystem::path BlockPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
+ std::filesystem::path BlockPath = BlockStore::GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test");
std::filesystem::rename(BlockPath, LegacyDataPath);
@@ -2463,7 +1887,13 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
{
ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash)))
+ {
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ CHECK(Buffer);
+ IoHash BufferHash = IoHash::HashBuffer(Buffer);
+ CHECK(ChunkHash == BufferHash);
+ }
WorkCompleted.fetch_add(1);
});
}