diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-01 22:34:31 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-01 22:34:31 +0200 |
| commit | be12749e0adde39d47875d3c4d2136dbcffbcb3d (patch) | |
| tree | 9cb340f4b66ea30472f1fea4ac130bd69786ee53 /zenserver/cache/structuredcachestore.cpp | |
| parent | threading issues resolved (diff) | |
| download | zen-be12749e0adde39d47875d3c4d2136dbcffbcb3d.tar.xz zen-be12749e0adde39d47875d3c4d2136dbcffbcb3d.zip | |
collectgarbage for compactcas and structured cache uses shared implementation
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 628 |
1 files changed, 161 insertions, 467 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 53a479edb..d313cd0c2 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -127,25 +127,9 @@ namespace { static_assert(sizeof(LegacyDiskIndexEntry) == 36); - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".slog"; - const char* DataExtension = ".sobs"; - - std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) - { - ExtendablePathBuilder<256> Path; - - char BlockHexString[9]; - ToHexNumber(BlockIndex, BlockHexString); - - Path.Append(BlocksBasePath); - Path.AppendSeparator(); - Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); - Path.AppendSeparator(); - Path.Append(BlockHexString); - Path.Append(DataExtension); - return Path.ToPath(); - } + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + const char* LegacyDataExtension = ".sobs"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { @@ -169,7 +153,7 @@ namespace { std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir) { - return BucketDir / (std::string("zen") + DataExtension); + return BucketDir / (std::string("zen") + LegacyDataExtension); } std::vector<DiskIndexEntry> MakeDiskIndexEntries(const std::unordered_map<IoHash, DiskLocation>& MovedChunks, @@ -718,8 +702,6 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() std::vector<DiskIndexEntry> Entries; { - RwLock::SharedLockScope __(m_InsertLock); - RwLock::SharedLockScope ___(m_IndexLock); Entries.resize(m_Index.size()); uint64_t EntryIndex = 0; @@ -896,7 +878,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) }); uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) { ++WriteBlockIndex; } @@ -1083,7 +1065,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) } LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex); CreateDirectories(BlockPath.parent_path()); BlockFile.Close(); std::filesystem::rename(LegacyDataPath, BlockPath); @@ -1152,7 +1134,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) BlockRanges.push_back(BlockRange); WriteBlockIndex++; - while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) { ++WriteBlockIndex; } @@ -1191,7 +1173,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) NiceTimeSpanMs(ETA)); } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); + std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); BlockStoreFile ChunkBlock(BlockPath); ChunkBlock.Create(BlockRange.BlockSize); uint64_t Offset = 0; @@ -1299,7 +1281,8 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - std::unordered_set<uint32_t> KnownBlocks; + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_Index.size()); for (const auto& Entry : m_Index) { const DiskLocation& Location = Entry.second.Location; @@ -1308,62 +1291,11 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is { continue; } - KnownBlocks.insert(Location.GetBlockLocation(m_PayloadAlignment).BlockIndex); + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + KnownLocations.push_back(BlockLocation); } - if (std::filesystem::is_directory(m_BlocksBasePath)) - { - std::vector<std::filesystem::path> FoldersToScan; - FoldersToScan.push_back(m_BlocksBasePath); - size_t FolderOffset = 0; - while (FolderOffset < FoldersToScan.size()) - { - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) - { - if (Entry.is_directory()) - { - FoldersToScan.push_back(Entry.path()); - continue; - } - if (Entry.is_regular_file()) - { - const std::filesystem::path Path = Entry.path(); - if (Path.extension() != DataExtension) - { - continue; - } - std::string FileName = Path.stem().string(); - uint32_t BlockIndex; - bool OK = ParseHexNumber(FileName, BlockIndex); - if (!OK) - { - continue; - } - if (!KnownBlocks.contains(BlockIndex)) - { - // Log removing unreferenced block - // Clear out unused blocks - ZEN_INFO("removing unused block for '{}' at '{}'", m_BucketDir / m_BucketName, Path); - std::error_code Ec; - std::filesystem::remove(Path, Ec); - if (Ec) - { - ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); - } - continue; - } - Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); - BlockFile->Open(); - m_ChunkBlocks[BlockIndex] = BlockFile; - } - } - ++FolderOffset; - } - } - else - { - CreateDirectories(m_BlocksBasePath); - } + m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0)) { @@ -1390,14 +1322,14 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H bool ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) { - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); + + Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); + if (!ChunkBlock) { return false; } - const BlockStoreLocation& Location = Loc.GetBlockLocation(m_PayloadAlignment); - Ref<BlockStoreFile> ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; - OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size); OutValue.Value.SetContentType(Loc.GetContentType()); @@ -1437,15 +1369,17 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal { IndexEntry& Entry = It.value(); Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + DiskLocation Location = Entry.Location; + _.ReleaseNow(); - if (GetInlineCacheValue(Entry.Location, OutValue)) + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + return GetStandaloneCacheValue(Location, HashKey, OutValue); + } + if (GetInlineCacheValue(Location, OutValue)) { return true; } - - _.ReleaseNow(); - - return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue); } return false; @@ -1463,84 +1397,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& { return PutStandaloneCacheValue(HashKey, Value); } - - // Small object put - - uint8_t EntryFlags = 0; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - uint64_t ChunkSize = Value.Value.Size(); - - uint32_t WriteBlockIndex; - Ref<BlockStoreFile> WriteBlock; - uint64_t InsertOffset; - - { - RwLock::ExclusiveLockScope _(m_InsertLock); - - WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - bool IsWriting = m_WriteBlock != nullptr; - if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > MaxBlockSize) - { - if (m_WriteBlock) - { - m_WriteBlock = nullptr; - } - { - RwLock::ExclusiveLockScope __(m_IndexLock); - if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) - { - throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BucketDir / m_BucketName)); - } - WriteBlockIndex += IsWriting ? 1 : 0; - while (m_ChunkBlocks.contains(WriteBlockIndex)) - { - WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - m_WriteBlock = new BlockStoreFile(BlockPath); - m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - } - m_CurrentInsertOffset = 0; - m_WriteBlock->Create(MaxBlockSize); - } - InsertOffset = m_CurrentInsertOffset; - m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); - WriteBlock = m_WriteBlock; - } - - DiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment, EntryFlags); - const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; - - WriteBlock->Write(Value.Value.Data(), ChunkSize, InsertOffset); - m_SlogFile.Append(DiskIndexEntry); - - m_TotalSize.fetch_add(ChunkSize, std::memory_order_seq_cst); - { - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - IndexEntry& Entry = It.value(); - Entry.Location = Location; - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - } - else - { - m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); - } - } + PutInlineCacheValue(HashKey, Value); } void @@ -1555,21 +1412,10 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { - { - RwLock::ExclusiveLockScope _(m_InsertLock); - if (m_CurrentInsertOffset > 0) - { - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - m_WriteBlock = nullptr; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - m_CurrentInsertOffset = 0; - } - } - RwLock::SharedLockScope _(m_IndexLock); + m_BlockStore.Flush(); + RwLock::SharedLockScope _(m_IndexLock); MakeIndexSnapshot(); - SaveManifest(); } @@ -1615,20 +1461,22 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) ZenCacheValue Value; - if (GetInlineCacheValue(Loc, Value)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - // Validate contents + if (GetInlineCacheValue(Loc, Value)) + { + // Validate contents + continue; + } } else if (GetStandaloneCacheValue(Loc, HashKey, Value)) { // Note: we cannot currently validate contents since we don't // have a content hash! + continue; } - else - { - // Value not found - BadKeys.push_back(HashKey); - } + // Value not found + BadKeys.push_back(HashKey); } } @@ -1726,18 +1574,23 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); - if (!GetInlineCacheValue(Loc, CacheValue)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - GetStandaloneCacheValue(Loc, Key, CacheValue); + if (!GetStandaloneCacheValue(Loc, Key, CacheValue)) + { + continue; + } + } + else if (!GetInlineCacheValue(Loc, CacheValue)) + { + continue; } } - if (CacheValue.Value) - { - ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); - CbObject Obj(SharedBuffer{CacheValue.Value}); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - } + ZEN_ASSERT(CacheValue.Value); + ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); + CbObject Obj(SharedBuffer{CacheValue.Value}); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } @@ -1797,10 +1650,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_SlogFile.Flush(); - IndexMap Index; - size_t BlockCount; - uint64_t ExcludeBlockIndex = 0x800000000ull; - std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName); std::vector<IoHash> DeleteCacheKeys; DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); @@ -1816,30 +1665,27 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); return; } + + IndexMap Index; + BlockStore::ReclaimSnapshotState BlockStoreState; { - RwLock::SharedLockScope __(m_InsertLock); - RwLock::SharedLockScope ___(m_IndexLock); + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); { - Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); if (m_Index.empty()) { ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); return; } - if (m_WriteBlock) - { - ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - } - __.ReleaseNow(); + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } SaveManifest(); - Index = m_Index; - BlockCount = m_ChunkBlocks.size(); + Index = m_Index; for (const IoHash& Key : DeleteCacheKeys) { @@ -1936,295 +1782,102 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { return; } - std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; - std::vector<std::vector<IoHash>> KeepChunks; - std::vector<std::vector<IoHash>> DeleteChunks; - - BlockIndexToChunkMapIndex.reserve(BlockCount); - KeepChunks.reserve(BlockCount); - DeleteChunks.reserve(BlockCount); - size_t GuesstimateCountPerBlock = TotalChunkHashes.size() / BlockCount / 2; - - uint64_t DeleteCount = 0; - - uint64_t NewTotalSize = 0; + TotalChunkCount = TotalChunkHashes.size(); - std::unordered_set<IoHash, IoHash::Hasher> Expired; - Expired.insert(DeleteCacheKeys.begin(), DeleteCacheKeys.end()); + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<size_t> KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = Index.find(ChunkHash); - const DiskLocation& Location = KeyIt->second.Location; - BlockStoreLocation BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); - - uint32_t BlockIndex = BlockLocation.BlockIndex; - - if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex) - { - return; - } - - auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); - size_t ChunkMapIndex = 0; - if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) - { - ChunkMapIndex = KeepChunks.size(); - BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; - KeepChunks.resize(ChunkMapIndex + 1); - KeepChunks.back().reserve(GuesstimateCountPerBlock); - DeleteChunks.resize(ChunkMapIndex + 1); - DeleteChunks.back().reserve(GuesstimateCountPerBlock); - } - else - { - ChunkMapIndex = BlockIndexPtr->second; - } + auto KeyIt = Index.find(ChunkHash); + const DiskLocation& DiskLocation = KeyIt->second.Location; + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; if (Keep) { - std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex]; - ChunkMap.push_back(ChunkHash); - NewTotalSize += BlockLocation.Size; - } - else - { - std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; - ChunkMap.push_back(ChunkHash); - DeleteCount++; + KeepChunkIndexes.push_back(ChunkIndex); } }); - std::unordered_set<uint32_t> BlocksToReWrite; - BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); - for (const auto& Entry : BlockIndexToChunkMapIndex) - { - uint32_t BlockIndex = Entry.first; - size_t ChunkMapIndex = Entry.second; - const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; - if (ChunkMap.empty()) - { - continue; - } - BlocksToReWrite.insert(BlockIndex); - } + size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_BucketDir / m_BucketName, DeleteCount, - NiceBytes(TotalSize - NewTotalSize), + 0, // NiceBytes(TotalSize - NewTotalSize), TotalChunkCount, NiceBytes(TotalSize)); return; } - auto AddToDeleted = [this, &Index, &DeletedCount, &DeletedSize](const std::vector<IoHash>& DeletedEntries) { - for (const IoHash& ChunkHash : DeletedEntries) - { - const DiskLocation& Location = Index[ChunkHash].Location; - ZEN_ASSERT(!Location.IsFlagSet(DiskLocation::kStandaloneFile)); - DeletedSize += Index[ChunkHash].Location.GetBlockLocation(m_PayloadAlignment).Size; - } - DeletedCount += DeletedEntries.size(); - }; - - // Move all chunks in blocks that have chunks removed to new blocks - - Ref<BlockStoreFile> NewBlockFile; - uint64_t WriteOffset = 0; - uint32_t NewBlockIndex = 0; - - auto UpdateLocations = [this](const std::span<DiskIndexEntry>& Entries) { - for (const DiskIndexEntry& Entry : Entries) - { - if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + std::vector<IoHash> DeletedChunks; + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [this, &DeletedChunks, &ChunkIndexToChunkHash, &Index, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( + uint32_t, + const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, + const std::vector<size_t>& RemovedChunks) { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) { - auto KeyIt = m_Index.find(Entry.Key); - uint64_t ChunkSize = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment).Size; - m_TotalSize.fetch_sub(ChunkSize, std::memory_order_seq_cst); - m_Index.erase(KeyIt); - continue; + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; + LogEntries.push_back( + {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; + LogEntries.push_back({.Key = ChunkHash, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), + m_PayloadAlignment, + OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + DeletedChunks.push_back(ChunkHash); } - m_Index[Entry.Key].Location = Entry.Location; - } - }; - - std::unordered_map<IoHash, DiskLocation> MovedBlockChunks; - for (uint32_t BlockIndex : BlocksToReWrite) - { - const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; - - Ref<BlockStoreFile> OldBlockFile; - { - RwLock::SharedLockScope _i(m_IndexLock); - OldBlockFile = m_ChunkBlocks[BlockIndex]; - } - const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex]; - if (KeepMap.empty()) - { - const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; - std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries({}, DeleteMap); m_SlogFile.Append(LogEntries); m_SlogFile.Flush(); { - RwLock::ExclusiveLockScope _i(m_IndexLock); + RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; - const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + const auto ____ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); - UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex] = nullptr; - } - AddToDeleted(DeleteMap); - ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'", - m_BucketDir / m_BucketName, - BlockIndex, - OldBlockFile->GetPath()); - std::error_code Ec; - OldBlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) - { - ZEN_WARN("Failed to flag file '{}' for deletion, reason: '{}'", OldBlockFile->GetPath(), Ec.message()); - } - continue; - } - - std::vector<uint8_t> Chunk; - for (const IoHash& ChunkHash : KeepMap) - { - auto KeyIt = Index.find(ChunkHash); - const BlockStoreLocation ChunkLocation = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment); - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - - if (!NewBlockFile || (WriteOffset + Chunk.size() > MaxBlockSize)) - { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); - std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries(MovedBlockChunks, {}); - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - - if (NewBlockFile) - { - NewBlockFile->Truncate(WriteOffset); - NewBlockFile->Flush(); - } + for (const DiskIndexEntry& Entry : LogEntries) { - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - UpdateLocations(LogEntries); - if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + if (Entry.Location.GetFlags() & DiskLocation::kTombStone) { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_BucketDir / m_BucketName, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return; - } - while (m_ChunkBlocks.contains(NextBlockIndex)) - { - NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - } - std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; - } - - MovedCount += MovedBlockChunks.size(); - MovedBlockChunks.clear(); - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return; - } - if (Space.Free < MaxBlockSize) - { - uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve(); - if (Space.Free + ReclaimedSpace < MaxBlockSize) - { - ZEN_WARN("garbage collect from '{}' FAILED, required disk space {}, free {}", - m_BucketDir / m_BucketName, - MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); - RwLock::ExclusiveLockScope _l(m_IndexLock); - Stopwatch Timer; - const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - m_ChunkBlocks.erase(NextBlockIndex); - return; + m_Index.erase(Entry.Key); + uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size; + m_TotalSize.fetch_sub(ChunkSize); + continue; } - - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", - m_BucketDir / m_BucketName, - ReclaimedSpace, - NiceBytes(Space.Free + ReclaimedSpace)); + m_Index[Entry.Key].Location = Entry.Location; } - NewBlockFile->Create(MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; } + }); - NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - MovedBlockChunks.emplace(ChunkHash, - DiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, - m_PayloadAlignment, - KeyIt->second.Location.Flags)); - WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); - } - Chunk.clear(); - if (NewBlockFile) - { - NewBlockFile->Truncate(WriteOffset); - NewBlockFile->Flush(); - NewBlockFile = {}; - } - - const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; - std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries(MovedBlockChunks, DeleteMap); - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - { - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex] = nullptr; - } - MovedCount += MovedBlockChunks.size(); - AddToDeleted(DeleteMap); - MovedBlockChunks.clear(); - - ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'", - m_BucketDir / m_BucketName, - BlockIndex, - OldBlockFile->GetPath()); - std::error_code Ec; - OldBlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) - { - ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); - } - OldBlockFile = nullptr; - } + GcCtx.DeletedCas(DeletedChunks); } void @@ -2367,6 +2020,47 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c m_TotalSize.fetch_add(Loc.Size(), std::memory_order_seq_cst); } +void +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + uint8_t EntryFlags = 0; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + uint64_t ChunkSize = Value.Value.Size(); + + m_BlockStore.WriteChunk(Value.Value.Data(), + ChunkSize, + m_PayloadAlignment, + [this, &HashKey, EntryFlags](const BlockStoreLocation& BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; + m_SlogFile.Append(DiskIndexEntry); + m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order_seq_cst); + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + IndexEntry& Entry = It.value(); + Entry.Location = Location; + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + } + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } + }); +} + ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) @@ -3026,7 +2720,7 @@ TEST_CASE("z$.legacyconversion") std::filesystem::path BucketDir = TempDir.Path() / Bucket; std::filesystem::path BlocksBaseDir = BucketDir / "blocks"; - std::filesystem::path CasPath = GetBlockPath(BlocksBaseDir, 1); + std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1); std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir); std::filesystem::remove(LegacyDataPath); std::filesystem::rename(CasPath, LegacyDataPath); |