From 31dd0f8906aa5a27b8c453c72f6d10964a3be9eb Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 6 Apr 2022 15:46:29 +0200 Subject: structured cache with block store --- zenserver/cache/structuredcachestore.cpp | 1977 ++++++++++++++++++++++++++---- 1 file changed, 1758 insertions(+), 219 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 738e4c1fd..c5ccef523 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -14,13 +14,13 @@ #include #include #include -#include -#include #include #include #include #include +#include + #if ZEN_PLATFORM_WINDOWS # include #endif @@ -30,10 +30,220 @@ ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END +#if ZEN_WITH_TESTS +# include +# include +# include +# include +#endif + ////////////////////////////////////////////////////////////////////////// +#pragma pack(push) +#pragma pack(1) + namespace zen { +namespace { + +#pragma pack(push) +#pragma pack(1) + + struct CacheBucketIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(CacheBucketIndexHeader) == 32); + + struct LegacyDiskLocation + { + inline LegacyDiskLocation() = default; + + inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) + : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) + , LowerSize(ValueSize & 0xFFFFffff) + , IndexDataSize(IndexSize) + { + } + + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) + static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary + static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value + static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format + + static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } + + inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t Size() const { return LowerSize; } + inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } + inline ZenContentType GetContentType() const + { + ZenContentType ContentType = ZenContentType::kBinary; + + if (IsFlagSet(LegacyDiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; + } + + if (IsFlagSet(LegacyDiskLocation::kCompressed)) + { + ContentType = ZenContentType::kCompressedBinary; + } + + return ContentType; + } + inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; } + + private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; + }; + + struct LegacyDiskIndexEntry + { + IoHash Key; + LegacyDiskLocation Location; + }; + +#pragma pack(pop) + + static_assert(sizeof(LegacyDiskIndexEntry) == 36); + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + const char* DataExtension = ".sobs"; + + std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) + { + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(DataExtension); + return Path.ToPath(); + } + + std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + ".tmp" + IndexExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + LogExtension); + } + + std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir) + { + return BucketDir / (std::string("zen") + LogExtension); + } + + std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir) + { + return BucketDir / (std::string("zen") + DataExtension); + } + + std::vector MakeDiskIndexEntries(const std::unordered_map& MovedChunks, + const std::vector& DeletedChunks) + { + std::vector result; + result.reserve(MovedChunks.size()); + for (const auto& MovedEntry : MovedChunks) + { + result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second}); + } + for (const IoHash& ChunkHash : DeletedChunks) + { + DiskLocation Location; + Location.Flags |= DiskLocation::kTombStone; + result.push_back({.Key = ChunkHash, .Location = Location}); + } + return result; + } + + bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured | + LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString()); + return false; + } + if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.GetFlags() & + ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + +} // namespace + namespace fs = std::filesystem; static CbObject @@ -60,9 +270,9 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) } ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) -: GcStorage(Gc) +: m_RootDir(RootDir) +, GcStorage(Gc) , GcContributor(Gc) -, m_RootDir(RootDir) , m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); @@ -425,6 +635,8 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { using namespace std::literals; + m_BlocksBasePath = BucketDir / "blocks"; + CreateDirectories(BucketDir); std::filesystem::path ManifestPath{BucketDir / "zen_manifest"}; @@ -470,48 +682,694 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } void -ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { - m_BucketDir = BucketDir; + ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([this, &EntryCount, &Timer] { + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", + m_BucketDir / m_BucketName, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - uint64_t MaxFileOffset = 0; - uint64_t InvalidEntryCount = 0; - m_SobsCursor = 0; - m_TotalSize = 0; + namespace fs = std::filesystem; - m_Index.clear(); + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - std::filesystem::path SobsPath{BucketDir / "zen.sobs"}; - std::filesystem::path SlogPath{BucketDir / "zen.slog"}; + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } - m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + try + { + m_SlogFile.Flush(); + + // Write the current state of the location map to a new index state + uint64_t LogCount = 0; + std::vector Entries; - m_SlogFile.Replay( - [&](const DiskIndexEntry& Entry) { - if (Entry.Key == IoHash::Zero) + { + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_IndexLock); + Entries.resize(m_Index.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) { - ++InvalidEntryCount; + DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second.Location; } - else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + + LogCount = m_SlogFile.GetLogCount(); + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow(m_PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadIndexFile() +{ + std::vector Entries; + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([this, &Entries, &Timer] { + ZEN_INFO("read store '{}' index containing #{} entries in {}", + m_BucketDir / m_BucketName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) && + (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) { - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; + + std::string InvalidEntryReason; + for (const DiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + + return Header.LogPosition; } else { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + } + } + return 0; +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) +{ + std::vector Entries; + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + if (std::filesystem::is_regular_file(LogPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([LogPath, &Entries, &Timer] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + uint64_t ReadCount = EntryCount - SkipEntryCount; + m_Index.reserve(ReadCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount())); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); } - MaxFileOffset = std::max(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); - }, - 0); + } + } + return 0; +}; + +uint64_t +ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) +{ + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); + + if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) + { + return 0; + } + + ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName); + + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); + uint64_t MigratedChunkCount = 0; + uint32_t MigratedBlockCount = 0; + Stopwatch MigrationTimer; + uint64_t TotalSize = 0; + const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { + ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", + m_BucketDir / m_BucketName, + MigratedChunkCount, + MigratedBlockCount, + NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), + NiceBytes(TotalSize)); + }); + + uint32_t WriteBlockIndex = 0; + while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return 0; + } + + if (Space.Free < MaxBlockSize) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", + m_BucketDir / m_BucketName, + MaxBlockSize, + NiceBytes(Space.Free)); + return 0; + } + + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + + std::unordered_map LegacyDiskIndex; + uint64_t InvalidEntryCount = 0; + + TCasLogFile LegacyCasLog; + LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); + { + Stopwatch Timer; + const auto __ = MakeGuard([LegacyLogPath, &LegacyDiskIndex, &Timer] { + ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", + LegacyLogPath, + LegacyDiskIndex.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + if (LegacyCasLog.Initialize()) + { + LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); + LegacyCasLog.Replay( + [&](const LegacyDiskIndexEntry& Record) { + if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + std::string InvalidEntryReason; + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (m_Index.contains(Record.Key)) + { + return; + } + LegacyDiskIndex[Record.Key] = Record; + }, + 0); + + std::vector BadEntries; + uint64_t BlockFileSize = BlockFile.FileSize(); + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyDiskIndexEntry& Record(Entry.second); + if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + continue; + } + if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize) + { + continue; + } + ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); + BadEntries.push_back(Entry.first); + } + for (const IoHash& BadHash : BadEntries) + { + LegacyDiskIndex.erase(BadHash); + } + InvalidEntryCount += BadEntries.size(); + } + } if (InvalidEntryCount) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath); + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); + } + + if (LegacyDiskIndex.empty()) + { + LegacyCasLog.Close(); + BlockFile.Close(); + if (CleanSource) + { + // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find + // a manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + } + return 0; + } + + uint64_t BlockChunkCount = 0; + uint64_t BlockTotalSize = 0; + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyDiskIndexEntry& Record(Entry.second); + if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + continue; + } + BlockChunkCount++; + BlockTotalSize += Record.Location.Size(); + } + + uint64_t RequiredDiskSpace = BlockTotalSize + ((m_PayloadAlignment - 1) * BlockChunkCount); + uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; + if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", + m_BucketDir / m_BucketName, + MaxRequiredBlockCount, + BlockStoreDiskLocation::MaxBlockIndex); + return 0; + } + + constexpr const uint64_t DiskReserve = 1ul << 28; + + if (CleanSource) + { + if (Space.Free < (MaxBlockSize + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_BucketDir / m_BucketName, + NiceBytes(MaxBlockSize + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } + else + { + if (Space.Free < (RequiredDiskSpace + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_BucketDir / m_BucketName, + NiceBytes(RequiredDiskSpace + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } + + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + CreateDirectories(LogPath.parent_path()); + TCasLogFile CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + if (CleanSource && (MaxRequiredBlockCount < 2)) + { + std::vector LogEntries; + LogEntries.reserve(LegacyDiskIndex.size()); + + // We can use the block as is, just move it and add the blocks to our new log + for (auto& Entry : LegacyDiskIndex) + { + const LegacyDiskIndexEntry& Record(Entry.second); + + DiskLocation NewLocation; + uint8_t Flags = 0xff & (Record.Location.Flags() >> 56); + if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + NewLocation = DiskLocation(Record.Location.Size(), Flags); + } + else + { + BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.Offset(), Record.Location.Size()); + NewLocation = DiskLocation(NewChunkLocation, m_PayloadAlignment, Flags); + } + LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + CreateDirectories(BlockPath.parent_path()); + BlockFile.Close(); + std::filesystem::rename(LegacyDataPath, BlockPath); + CasLog.Append(LogEntries); + for (const DiskIndexEntry& Entry : LogEntries) + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + } + else + { + std::vector ChunkHashes; + ChunkHashes.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) + { + ChunkHashes.push_back(Entry.first); + } + + std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { + auto LhsKeyIt = LegacyDiskIndex.find(Lhs); + auto RhsKeyIt = LegacyDiskIndex.find(Rhs); + return LhsKeyIt->second.Location.Offset() < RhsKeyIt->second.Location.Offset(); + }); + + uint64_t BlockSize = 0; + uint64_t BlockOffset = 0; + std::vector NewLocations; + struct BlockData + { + std::vector> Chunks; + uint64_t BlockOffset; + uint64_t BlockSize; + uint32_t BlockIndex; + }; + + std::vector BlockRanges; + std::vector> Chunks; + BlockRanges.reserve(MaxRequiredBlockCount); + for (const IoHash& ChunkHash : ChunkHashes) + { + const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& LegacyChunkLocation = LegacyEntry.Location; + + if (LegacyChunkLocation.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + // For standalone files we just store the chunk hash an use the size from the legacy index as is + Chunks.push_back({ChunkHash, {}}); + continue; + } + + uint64_t ChunkOffset = LegacyChunkLocation.Offset(); + uint64_t ChunkSize = LegacyChunkLocation.Size(); + uint64_t ChunkEnd = ChunkOffset + ChunkSize; + + if (BlockSize == 0) + { + BlockOffset = ChunkOffset; + } + if ((ChunkEnd - BlockOffset) > MaxBlockSize) + { + BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; + BlockRange.Chunks.swap(Chunks); + BlockRanges.push_back(BlockRange); + + WriteBlockIndex++; + while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + BlockOffset = ChunkOffset; + BlockSize = 0; + } + BlockSize = RoundUp(BlockSize, m_PayloadAlignment); + BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; + Chunks.push_back({ChunkHash, ChunkLocation}); + BlockSize = ChunkEnd - BlockOffset; + } + if (BlockSize > 0) + { + BlockRanges.push_back( + {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); + } + Stopwatch WriteBlockTimer; + + std::reverse(BlockRanges.begin(), BlockRanges.end()); + std::vector Buffer(1 << 28); + for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) + { + const BlockData& BlockRange = BlockRanges[Idx]; + if (Idx > 0) + { + uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; + uint64_t Completed = BlockOffset + BlockSize - Remaining; + uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; + + ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", + m_BucketDir / m_BucketDir, + Idx, + BlockRanges.size(), + NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), + NiceBytes(BlockOffset + BlockSize), + NiceTimeSpanMs(ETA)); + } + + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); + ChunkBlock.Create(BlockRange.BlockSize); + uint64_t Offset = 0; + while (Offset < BlockRange.BlockSize) + { + uint64_t Size = BlockRange.BlockSize - Offset; + if (Size > Buffer.size()) + { + Size = Buffer.size(); + } + BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); + ChunkBlock.Write(Buffer.data(), Size, Offset); + Offset += Size; + } + ChunkBlock.Truncate(Offset); + ChunkBlock.Flush(); + + std::vector LogEntries; + LogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; + + DiskLocation NewLocation; + uint8_t Flags = 0xff & (LegacyEntry.Location.Flags() >> 56); + if (LegacyEntry.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + NewLocation = DiskLocation(LegacyEntry.Location.Size(), Flags); + } + else + { + NewLocation = DiskLocation(Entry.second, m_PayloadAlignment, Flags); + } + LogEntries.push_back({.Key = Entry.first, .Location = NewLocation}); + } + CasLog.Append(LogEntries); + for (const DiskIndexEntry& Entry : LogEntries) + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + + if (CleanSource) + { + std::vector LegacyLogEntries; + LegacyLogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + LegacyLogEntries.push_back( + {.Key = Entry.first, .Location = LegacyDiskLocation(0, 0, 0, LegacyDiskLocation::kTombStone)}); + } + LegacyCasLog.Append(LegacyLogEntries); + BlockFile.SetFileSize(BlockRange.BlockOffset); + } + } + } + BlockFile.Close(); + LegacyCasLog.Close(); + CasLog.Close(); + + if (CleanSource) + { + // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find + // a manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); } + return MigratedChunkCount; +} + +void +ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +{ + m_BucketDir = BucketDir; + + m_TotalSize = 0; + + m_Index.clear(); + + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + + if (IsNew) + { + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); + fs::remove(LegacyLogPath); + fs::remove(LegacyDataPath); + fs::remove(LogPath); + fs::remove(IndexPath); + fs::remove_all(m_BlocksBasePath); + } + + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); + uint64_t LegacyLogEntryCount = MigrateLegacyData(true); - m_SobsCursor = (MaxFileOffset + 15) & ~15; + CreateDirectories(m_BucketDir); + + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + std::unordered_set KnownBlocks; + for (const auto& Entry : m_Index) + { + const DiskLocation& Location = Entry.second.Location; + m_TotalSize.fetch_add(Location.Size(), std::memory_order_release); + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + KnownBlocks.insert(Location.GetBlockLocation(m_PayloadAlignment).BlockIndex); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != DataExtension) + { + continue; + } + std::string FileName = Path.stem().string(); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_INFO("removing unused block for '{}' at '{}'", m_BucketDir / m_BucketName, Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref BlockFile = new BlockStoreFile(Path); + BlockFile->Open(); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } + + if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0)) + { + MakeIndexSnapshot(); + } + // TODO: should validate integrity of container files here } void @@ -537,7 +1395,10 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen return false; } - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size()); + const BlockStoreLocation& Location = Loc.GetBlockLocation(m_PayloadAlignment); + Ref ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; + + OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size); OutValue.Value.SetContentType(Loc.GetContentType()); return true; @@ -562,23 +1423,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return false; } -void -ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc, - const IoHash& HashKey, - const fs::path& Path, - std::error_code& Ec) -{ - ZEN_DEBUG("deleting standalone cache file '{}'", Path); - fs::remove(Path, Ec); - - if (!Ec) - { - m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}}); - m_Index.erase(HashKey); - m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); - } -} - bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -619,54 +1463,91 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& { return PutStandaloneCacheValue(HashKey, Value); } - else + + // Small object put + + uint8_t EntryFlags = 0; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { - // Small object put + EntryFlags |= DiskLocation::kCompressed; + } + + uint64_t ChunkSize = Value.Value.Size(); + + uint32_t WriteBlockIndex; + Ref WriteBlock; + uint64_t InsertOffset; - uint64_t EntryFlags = 0; + { + RwLock::ExclusiveLockScope _(m_InsertLock); - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = m_WriteBlock != nullptr; + if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > MaxBlockSize) { - EntryFlags |= DiskLocation::kCompressed; + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + { + RwLock::ExclusiveLockScope __(m_IndexLock); + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BucketDir / m_BucketName)); + } + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + m_WriteBlock = new BlockStoreFile(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + } + m_CurrentInsertOffset = 0; + m_WriteBlock->Create(MaxBlockSize); } + InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); + WriteBlock = m_WriteBlock; + } - RwLock::ExclusiveLockScope _(m_IndexLock); - - DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags); + DiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; - m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16); + WriteBlock->Write(Value.Value.Data(), ChunkSize, InsertOffset); + m_SlogFile.Append(DiskIndexEntry); - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - m_Index.insert({HashKey, {Loc, GcClock::TickCount()}}); - } - else + m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed); + { + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) { // TODO: should check if write is idempotent and bail out if it is? // this would requiring comparing contents on disk unless we add a // content hash to the index entry IndexEntry& Entry = It.value(); - Entry.Location = Loc; + Entry.Location = Location; Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } } } void ZenCacheDiskLayer::CacheBucket::Drop() { + // TODO: close all open files and manage locking // TODO: add error handling - - m_SobsFile.Close(); m_SlogFile.Close(); DeleteDirectories(m_BucketDir); } @@ -674,10 +1555,20 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { + { + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } + } RwLock::SharedLockScope _(m_IndexLock); - m_SobsFile.Flush(); - m_SlogFile.Flush(); + MakeIndexSnapshot(); SaveManifest(); } @@ -754,9 +1645,10 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - const DiskLocation& Location = It->second.Location; - m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}}); + const auto It = m_Index.find(BadKey); + DiskLocation Location = It->second.Location; + Location.Flags |= DiskLocation::kTombStone; + m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } @@ -768,8 +1660,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); Stopwatch Timer; - const auto Guard = MakeGuard( - [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto Guard = MakeGuard([this, &Timer] { + ZEN_INFO("gathered references from '{}' in {}", m_BucketDir / m_BucketName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); const GcClock::TimePoint ExpireTime = GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration(); @@ -820,6 +1713,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } } } + _.ReleaseNow(); ValidKeys.reserve(std::distance(ValidIt, Entries.end())); ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt)); @@ -836,202 +1730,480 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - Flush(); - - RwLock::ExclusiveLockScope _(m_IndexLock); + std::vector ExpiredStandaloneEntries; + + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + uint64_t DeletedCount = 0; + uint64_t MovedCount = 0; + + const auto _ = MakeGuard([this, + &TotalTimer, + &WriteBlockTimeUs, + &WriteBlockLongestTimeUs, + &ReadBlockTimeUs, + &ReadBlockLongestTimeUs, + &TotalChunkCount, + &DeletedCount, + &MovedCount, + &DeletedSize, + &OldTotalSize] { + ZEN_INFO( + "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "entires ({}).", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + RwLock::SharedLockScope _(m_IndexLock); + SaveManifest(); + }); - const uint64_t OldCount = m_Index.size(); - const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + m_SlogFile.Flush(); - ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); + IndexMap Index; + size_t BlockCount; + uint64_t ExcludeBlockIndex = 0x800000000ull; - Stopwatch Timer; - const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] { - const uint64_t NewCount = m_Index.size(); - const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed); - ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})", - m_BucketDir, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - OldCount - NewCount, - NiceBytes(OldTotalSize - NewTotalSize), - OldCount, - NiceBytes(OldTotalSize)); - SaveManifest(); + std::span ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName); + std::vector DeleteCacheKeys; + DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); + GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { + if (Keep) + { + return; + } + DeleteCacheKeys.push_back(ChunkHash); }); - - if (m_Index.empty()) + if (DeleteCacheKeys.empty()) { + ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); return; } - - auto AddEntries = [this](std::span Keys, std::vector& OutEntries) { - for (const IoHash& Key : Keys) + { + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_IndexLock); { - if (auto It = m_Index.find(Key); It != m_Index.end()) + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.empty()) + { + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + return; + } + if (m_WriteBlock) { - OutEntries.push_back(*It); + ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); } + __.ReleaseNow(); } - }; - - std::vector ValidEntries; - std::vector ExpiredEntries; + SaveManifest(); + Index = m_Index; + BlockCount = m_ChunkBlocks.size(); - AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries); - AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries); + for (const IoHash& Key : DeleteCacheKeys) + { + if (auto It = Index.find(Key); It != Index.end()) + { + DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + { + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); + } + } + } + if (GcCtx.IsDeletionMode()) + { + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); + } + } - // Remove all standalone file(s) - // NOTE: This can probably be made asynchronously + if (GcCtx.IsDeletionMode()) { std::error_code Ec; ExtendablePathBuilder<256> Path; - for (const auto& Entry : ExpiredEntries) + for (const auto& Entry : ExpiredStandaloneEntries) { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + const IoHash& Key = Entry.Key; + const DiskLocation& Loc = Entry.Location; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - Path.Reset(); - BuildPath(Path, Key); + Path.Reset(); + BuildPath(Path, Key); - // NOTE: this will update index and log file - DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec); + { + RwLock::SharedLockScope __(m_IndexLock); + if (m_Index.contains(Key)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); + continue; + } + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(Path.c_str(), Ec); + } - if (Ec) + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + Ec.clear(); + DiskLocation RestoreLocation = Loc; + RestoreLocation.Flags &= ~DiskLocation::kTombStone; + + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); + continue; } + m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); + m_Index.insert({Key, {Loc, GcClock::TickCount()}}); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + continue; } + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedSize += Entry.Location.Size(); + DeletedCount++; } } - if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty()) + TotalChunkCount = Index.size(); + + std::vector TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : Index) { - // Naive GC implementation of small objects. Needs enough free - // disk space to store intermediate sob container along side the - // old container + const DiskLocation& Location = Entry.second.Location; - const auto ResetSobStorage = [this, &ValidEntries]() { - m_SobsFile.Close(); - m_SlogFile.Close(); + if (Location.Flags & DiskLocation::kStandaloneFile) + { + continue; + } + TotalChunkHashes.push_back(Entry.first); + } - const bool IsNew = true; - m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + if (TotalChunkHashes.empty()) + { + return; + } + std::unordered_map BlockIndexToChunkMapIndex; + std::vector> KeepChunks; + std::vector> DeleteChunks; - m_SobsCursor = 0; - m_TotalSize = 0; - m_Index.clear(); + BlockIndexToChunkMapIndex.reserve(BlockCount); + KeepChunks.reserve(BlockCount); + DeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkHashes.size() / BlockCount / 2; - for (const auto& Entry : ValidEntries) - { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + uint64_t DeleteCount = 0; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - m_SlogFile.Append({.Key = Key, .Location = Loc}); - m_Index.insert({Key, {Loc, GcClock::TickCount()}}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); - } - } - }; + uint64_t NewTotalSize = 0; + + std::unordered_set Expired; + Expired.insert(DeleteCacheKeys.begin(), DeleteCacheKeys.end()); + + GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = Index.find(ChunkHash); + const DiskLocation& Location = KeyIt->second.Location; + BlockStoreLocation BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + + uint32_t BlockIndex = BlockLocation.BlockIndex; - uint64_t NewContainerSize{}; - for (const auto& Entry : ValidEntries) + if (static_cast(BlockIndex) == ExcludeBlockIndex) { - const DiskLocation& Loc = Entry.second.Location; + return; + } - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false) - { - NewContainerSize += (Loc.Size() + sizeof(DiskLocation)); - } + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = KeepChunks.size(); + BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; + KeepChunks.resize(ChunkMapIndex + 1); + KeepChunks.back().reserve(GuesstimateCountPerBlock); + DeleteChunks.resize(ChunkMapIndex + 1); + DeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } + if (Keep) + { + std::vector& ChunkMap = KeepChunks[ChunkMapIndex]; + ChunkMap.push_back(ChunkHash); + NewTotalSize += BlockLocation.Size; + } + else + { + std::vector& ChunkMap = DeleteChunks[ChunkMapIndex]; + ChunkMap.push_back(ChunkHash); + DeleteCount++; } + }); - if (NewContainerSize == 0) + std::unordered_set BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) + { + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const std::vector& ChunkMap = DeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) { - ResetSobStorage(); - return; + continue; } + BlocksToReWrite.insert(BlockIndex); + } - const uint64_t DiskSpaceMargin = (256 << 10); + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); + ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_BucketDir / m_BucketName, + DeleteCount, + NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); + return; + } - std::error_code Ec; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec); - if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin) + auto AddToDeleted = [this, &Index, &DeletedCount, &DeletedSize](const std::vector& DeletedEntries) { + for (const IoHash& ChunkHash : DeletedEntries) { - ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)", - m_BucketDir, - NiceBytes(NewContainerSize), - NiceBytes(Space.Free)); - return; + const DiskLocation& Location = Index[ChunkHash].Location; + ZEN_ASSERT(!Location.IsFlagSet(DiskLocation::kStandaloneFile)); + DeletedSize += Index[ChunkHash].Location.GetBlockLocation(m_PayloadAlignment).Size; } + DeletedCount += DeletedEntries.size(); + }; - std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; - std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; + // Move all chunks in blocks that have chunks removed to new blocks - // Copy non expired sob(s) to temporary sob container + Ref NewBlockFile; + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + auto UpdateLocations = [this](const std::span& Entries) { + for (const DiskIndexEntry& Entry : Entries) { - BasicFile TmpSobs; - TCasLogFile TmpLog; - uint64_t TmpCursor{}; - std::vector Chunk; + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + auto KeyIt = m_Index.find(Entry.Key); + uint64_t ChunkSize = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment).Size; + m_TotalSize.fetch_sub(ChunkSize); + m_Index.erase(KeyIt); + continue; + } + m_Index[Entry.Key].Location = Entry.Location; + } + }; - TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate); - TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate); + std::unordered_map MovedBlockChunks; + for (uint32_t BlockIndex : BlocksToReWrite) + { + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; + + Ref OldBlockFile; + { + RwLock::SharedLockScope _i(m_IndexLock); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + } - for (const auto& Entry : ValidEntries) + const std::vector& KeepMap = KeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const std::vector& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector LogEntries = MakeDiskIndexEntries({}, DeleteMap); + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + { + RwLock::ExclusiveLockScope _i(m_IndexLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + m_ChunkBlocks[BlockIndex] = nullptr; + } + AddToDeleted(DeleteMap); + ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'", + m_BucketDir / m_BucketName, + BlockIndex, + OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + ZEN_WARN("Failed to flag file '{}' for deletion, reason: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + continue; + } + + std::vector Chunk; + for (const IoHash& ChunkHash : KeepMap) + { + auto KeyIt = Index.find(ChunkHash); + const BlockStoreLocation ChunkLocation = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment); + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - DiskLocation NewLoc; + if (!NewBlockFile || (WriteOffset + Chunk.size() > MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed); + std::vector LogEntries = MakeDiskIndexEntries(MovedBlockChunks, {}); + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + if (NewBlockFile) { - NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags()); + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); } - else { - Chunk.resize(Loc.Size()); - m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset()); - - NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags()); - TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); - TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BucketDir / m_BucketName, + static_cast(std::numeric_limits::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } - TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc}); + MovedCount += MovedBlockChunks.size(); + MovedBlockChunks.clear(); + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; + } + if (Space.Free < MaxBlockSize) + { + uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve(); + if (Space.Free + ReclaimedSpace < MaxBlockSize) + { + ZEN_WARN("garbage collect from '{}' FAILED, required disk space {}, free {}", + m_BucketDir / m_BucketName, + MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_IndexLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BucketDir / m_BucketName, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; } - } - // Swap state - try + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedBlockChunks.emplace(ChunkHash, + DiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, + m_PayloadAlignment, + KeyIt->second.Location.Flags)); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); + } + Chunk.clear(); + if (NewBlockFile) { - fs::path SobsPath{m_BucketDir / "zen.sobs"}; - fs::path SlogPath{m_BucketDir / "zen.slog"}; - - m_SobsFile.Close(); - m_SlogFile.Close(); - - fs::remove(SobsPath); - fs::remove(SlogPath); - - fs::rename(TmpSobsPath, SobsPath); - fs::rename(TmpSlogPath, SlogPath); + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = {}; + } - const bool IsNew = false; - OpenLog(m_BucketDir, IsNew); + const std::vector& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector LogEntries = MakeDiskIndexEntries(MovedBlockChunks, DeleteMap); + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + { + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + m_ChunkBlocks[BlockIndex] = nullptr; } - catch (std::exception& Err) + MovedCount += MovedBlockChunks.size(); + AddToDeleted(DeleteMap); + MovedBlockChunks.clear(); + + ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'", + m_BucketDir / m_BucketName, + BlockIndex, + OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) { - ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); - ResetSobStorage(); + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); } + OldBlockFile = nullptr; } } @@ -1144,16 +2316,20 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c // Update index - uint64_t EntryFlags = DiskLocation::kStandaloneFile; + uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); + DiskLocation Loc(Value.Value.Size(), EntryFlags); IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); if (auto It = m_Index.find(HashKey); It == m_Index.end()) @@ -1255,10 +2431,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z auto It = m_Buckets.try_emplace(BucketName, BucketName); Bucket = &It.first->second; - std::filesystem::path bucketPath = m_RootDir; - bucketPath /= BucketName; + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; - Bucket->OpenOrCreate(bucketPath); + Bucket->OpenOrCreate(BucketPath); } } @@ -1363,11 +2539,11 @@ void ZenCacheDiskLayer::Flush() { std::vector Buckets; - Buckets.reserve(m_Buckets.size()); { RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(&Kv.second); @@ -1419,6 +2595,9 @@ ZenCacheDiskLayer::TotalSize() const ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS +} + +namespace zen { using namespace std::literals; @@ -1427,10 +2606,18 @@ namespace testutils { IoBuffer CreateBinaryCacheValue(uint64_t Size) { - std::vector Data(size_t(Size / sizeof(uint32_t))); - std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; }); + static std::random_device rd; + static std::mt19937 g(rd()); - IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t)); + std::vector Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size()); Buf.SetContentType(ZenContentType::kBinary); return Buf; }; @@ -1737,6 +2924,7 @@ TEST_CASE("z$.gc") GcCtx.MaxCacheDuration(std::chrono::minutes(2)); GcCtx.CollectSmallObjects(true); + Zcs.Flush(); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) @@ -1751,6 +2939,357 @@ TEST_CASE("z$.gc") } } +TEST_CASE("z$.legacyconversion") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[] = {2041, + 1123, + 1223, + 1239, + 341, + 1412, + 912, + 774, + 341, + 431, + 554, + 1098, + 2048, + 339 + 64 * 1024, + 561 + 64 * 1024, + 16 + 64 * 1024, + 16 + 64 * 1024, + 2048, + 2048}; + size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); + size_t SingleBlockSize = 0; + std::vector Chunks; + Chunks.reserve(ChunkCount); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(testutils::CreateBinaryCacheValue(Size)); + SingleBlockSize += Size; + } + + std::vector ChunkHashes; + ChunkHashes.reserve(ChunkCount); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CreateDirectories(TempDir.Path()); + + const std::string Bucket = "rightintwo"; + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + const GcClock::TimePoint CurrentTime = GcClock::Now(); + + for (size_t i = 0; i < ChunkCount; i++) + { + Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]}); + } + + std::vector KeepChunks; + for (size_t i = 0; i < ChunkCount; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcContext GcCtx(CurrentTime + std::chrono::hours(2)); + GcCtx.MaxCacheDuration(std::chrono::minutes(2)); + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepChunks); + Zcs.Flush(); + Gc.CollectGarbage(GcCtx); + } + std::filesystem::path BucketDir = TempDir.Path() / Bucket; + std::filesystem::path BlocksBaseDir = BucketDir / "blocks"; + + std::filesystem::path CasPath = GetBlockPath(BlocksBaseDir, 1); + std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir); + std::filesystem::remove(LegacyDataPath); + std::filesystem::rename(CasPath, LegacyDataPath); + + std::vector LogEntries; + std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket); + if (std::filesystem::is_regular_file(IndexPath)) + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion && + Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) + { + LogEntries.resize(Header.EntryCount); + ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + } + } + ObjectIndexFile.Close(); + std::filesystem::remove(IndexPath); + } + + std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket); + { + TCasLogFile CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + LogEntries.reserve(CasLog.GetLogCount()); + CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); + } + TCasLogFile LegacyLog; + std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir); + LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); + + for (const DiskIndexEntry& Entry : LogEntries) + { + uint64_t Size; + uint64_t Offset; + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Size = Entry.Location.Location.StandaloneSize; + Offset = 0; + } + else + { + BlockStoreLocation Location = Entry.Location.GetBlockLocation(16); + Size = Location.Size; + Offset = Location.Offset; + } + LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast(Entry.Location.Flags) << 56); + LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation}; + LegacyLog.Append(LegacyEntry); + } + LegacyLog.Close(); + + std::filesystem::remove_all(BlocksBaseDir); + std::filesystem::remove(LogPath); + std::filesystem::remove(IndexPath); + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + + for (size_t i = 0; i < ChunkCount; i += 2) + { + ZenCacheValue Value; + CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value)); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value)); + CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value)); + } + } +} + +# if 0 +TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; + + std::vector ChunkHashes; + ChunkHashes.reserve(kChunkCount); + std::vector Chunks; + Chunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + ChunkHashes.emplace_back(Hash); + Chunks.emplace_back(Chunk); + } + + WorkerThreadPool ThreadPool(4); + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 32768, 16, true); + { + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = Chunks[Idx]; + const IoHash& Hash = ChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + + { + std::vector OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + IoBuffer Chunk = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Chunk); + CHECK(ChunkHash == Hash); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } + + std::unordered_set GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + { + std::vector OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::vector NewChunkHashes; + NewChunkHashes.reserve(kChunkCount); + std::vector NewChunks; + NewChunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunkHashes.emplace_back(Hash); + NewChunks.emplace_back(Chunk); + } + + RwLock ChunkHashesLock; + std::atomic_uint32_t AddedChunkCount; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = NewChunks[Idx]; + const IoHash& Hash = NewChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + AddedChunkCount.fetch_add(1); + }); + ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]); + if (Chunk) + { + CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + } + }); + } + + while (AddedChunkCount.load() < kChunkCount) + { + std::vector AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const IoHash& ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + + { + std::vector AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const IoHash& ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 77 == 0 && C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + } + { + for (const IoHash& ChunkHash : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Cas, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } + } +} +# endif + #endif void -- cgit v1.2.3 From 4e7d13b038bda5806647530ddfca9de825001a5f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 7 Apr 2022 22:20:35 +0200 Subject: cleaner GatherReferences --- zenserver/cache/structuredcachestore.cpp | 91 ++++++++++++++++++++------------ 1 file changed, 56 insertions(+), 35 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index c5ccef523..5c71ad7bb 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1659,9 +1659,20 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); - Stopwatch Timer; - const auto Guard = MakeGuard([this, &Timer] { - ZEN_INFO("gathered references from '{}' in {}", m_BucketDir / m_BucketName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([this, &TotalTimer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + ZEN_INFO("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs)); }); const GcClock::TimePoint ExpireTime = @@ -1669,60 +1680,70 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - RwLock::SharedLockScope _(m_IndexLock); - - std::vector ValidKeys; - std::vector ExpiredKeys; - std::vector Cids; - std::vector Entries(m_Index.begin(), m_Index.end()); - - std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) { - return LHS.second.LastAccess < RHS.second.LastAccess; - }); + IndexMap Index; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + Index = m_Index; + } - const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) { - const IndexEntry& Entry = Kv.second; - return Entry.LastAccess < Ticks; - }); + std::vector ExpiredKeys; + ExpiredKeys.reserve(1024); + for (const auto& Entry : Index) + { + if (Entry.second.LastAccess < ExpireTicks) + { + ExpiredKeys.push_back(Entry.first); + } + } + std::vector Cids; Cids.reserve(1024); - for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv) + for (const auto& Key : ExpiredKeys) { - const IoHash& Key = Kv->first; - const DiskLocation& Loc = Kv->second.Location; + IndexEntry& Entry = Index[Key]; + const DiskLocation& Loc = Entry.Location; if (Loc.IsFlagSet(DiskLocation::kStructured)) { + if (Cids.size() > 1024) + { + GcCtx.ContributeCids(Cids); + Cids.clear(); + } + ZenCacheValue CacheValue; - if (!GetInlineCacheValue(Loc, CacheValue)) { - GetStandaloneCacheValue(Loc, Key, CacheValue); + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (!GetInlineCacheValue(Loc, CacheValue)) + { + GetStandaloneCacheValue(Loc, Key, CacheValue); + } } if (CacheValue.Value) { ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); - if (Cids.size() > 1024) - { - GcCtx.ContributeCids(Cids); - Cids.clear(); - } CbObject Obj(SharedBuffer{CacheValue.Value}); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } } - _.ReleaseNow(); - - ValidKeys.reserve(std::distance(ValidIt, Entries.end())); - ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt)); - - std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; }); - std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; }); GcCtx.ContributeCids(Cids); - GcCtx.ContributeCacheKeys(m_BucketName, std::move(ValidKeys), std::move(ExpiredKeys)); + GcCtx.ContributeCacheKeys(m_BucketName, {}, std::move(ExpiredKeys)); } void -- cgit v1.2.3 From 5e43b80df4cf8fff6bd350139783fb15d9d25207 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 7 Apr 2022 23:34:15 +0200 Subject: correct expire vs contribute --- zenserver/cache/structuredcachestore.cpp | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 5c71ad7bb..d28964502 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1694,21 +1694,20 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) std::vector ExpiredKeys; ExpiredKeys.reserve(1024); + + std::vector Cids; + Cids.reserve(1024); + for (const auto& Entry : Index) { + const IoHash& Key = Entry.first; if (Entry.second.LastAccess < ExpireTicks) { - ExpiredKeys.push_back(Entry.first); + ExpiredKeys.push_back(Key); + continue; } - } - std::vector Cids; - Cids.reserve(1024); - - for (const auto& Key : ExpiredKeys) - { - IndexEntry& Entry = Index[Key]; - const DiskLocation& Loc = Entry.Location; + const DiskLocation& Loc = Entry.second.Location; if (Loc.IsFlagSet(DiskLocation::kStructured)) { @@ -1743,7 +1742,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } GcCtx.ContributeCids(Cids); - GcCtx.ContributeCacheKeys(m_BucketName, {}, std::move(ExpiredKeys)); + GcCtx.ContributeCacheKeys(m_BucketName, std::move(ExpiredKeys)); } void -- cgit v1.2.3 From 4e6abaacede5bb6dea1ff788b1259efeb1212bcc Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 12 Apr 2022 13:58:38 +0200 Subject: Add z$.threadedinsert test --- zenserver/cache/structuredcachestore.cpp | 205 ++++++++++++++++++------------- 1 file changed, 118 insertions(+), 87 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index d28964502..0ce473e89 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -604,7 +604,7 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue m_CacheMap.insert_or_assign(HashKey, BucketValue(Value.Value, GcClock::TickCount())); } - m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed); + m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order_seq_cst); } ////////////////////////////////////////////////////////////////////////// @@ -1303,7 +1303,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is for (const auto& Entry : m_Index) { const DiskLocation& Location = Entry.second.Location; - m_TotalSize.fetch_add(Location.Size(), std::memory_order_release); + m_TotalSize.fetch_add(Location.Size(), std::memory_order_seq_cst); if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; @@ -1524,7 +1524,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& WriteBlock->Write(Value.Value.Data(), ChunkSize, InsertOffset); m_SlogFile.Append(DiskIndexEntry); - m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed); + m_TotalSize.fetch_add(ChunkSize, std::memory_order_seq_cst); { RwLock::ExclusiveLockScope __(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) @@ -1908,10 +1908,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); m_Index.insert({Key, {Loc, GcClock::TickCount()}}); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order_seq_cst); continue; } - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order_seq_cst); DeletedSize += Entry.Location.Size(); DeletedCount++; } @@ -2043,7 +2043,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { auto KeyIt = m_Index.find(Entry.Key); uint64_t ChunkSize = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment).Size; - m_TotalSize.fetch_sub(ChunkSize); + m_TotalSize.fetch_sub(ChunkSize, std::memory_order_seq_cst); m_Index.erase(KeyIt); continue; } @@ -2364,7 +2364,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order_seq_cst); } ////////////////////////////////////////////////////////////////////////// @@ -3105,47 +3105,51 @@ TEST_CASE("z$.legacyconversion") } } -# if 0 TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - - CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 8192; - std::vector ChunkHashes; - ChunkHashes.reserve(kChunkCount); - std::vector Chunks; + struct Chunk + { + std::string Bucket; + IoBuffer Buffer; + }; + std::unordered_map Chunks; Chunks.reserve(kChunkCount); + const std::string Bucket1 = "rightinone"; + const std::string Bucket2 = "rightintwo"; + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - ChunkHashes.emplace_back(Hash); - Chunks.emplace_back(Chunk); + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + } + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + } } - WorkerThreadPool ThreadPool(4); - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 32768, 16, true); + CreateDirectories(TempDir.Path()); + + WorkerThreadPool ThreadPool(4); + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + const GcClock::TimePoint CurrentTime = GcClock::Now(); + { - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : Chunks) { - const IoBuffer& Chunk = Chunks[Idx]; - const IoHash& Hash = ChunkHashes[Idx]; - ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); - }); + ThreadPool.ScheduleWork([&Zcs, &Chunk]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); }); } while (ThreadPool.PendingWork() > 0) { @@ -3153,17 +3157,19 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + const uint64_t TotalSize = Zcs.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * Chunks.size(), TotalSize); { - std::vector OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { - IoHash ChunkHash = OldChunkHashes[Idx]; - IoBuffer Chunk = Cas.FindChunk(ChunkHash); - IoHash Hash = IoHash::HashBuffer(Chunk); + ThreadPool.ScheduleWork([&Zcs, &Chunk]() { + std::string Bucket = Chunk.second.Bucket; + IoHash ChunkHash = Chunk.first; + ZenCacheValue CacheValue; + + CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); + IoHash Hash = IoHash::HashBuffer(CacheValue.Value); CHECK(ChunkHash == Hash); }); } @@ -3172,62 +3178,73 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) Sleep(1); } } - - std::unordered_set GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::unordered_map GcChunkHashes; + GcChunkHashes.reserve(Chunks.size()); + for (const auto& Chunk : Chunks) { - std::vector OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - std::vector NewChunkHashes; - NewChunkHashes.reserve(kChunkCount); - std::vector NewChunks; - NewChunks.reserve(kChunkCount); + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + { + std::unordered_map NewChunks; for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - IoBuffer Chunk = CreateChunk(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunkHashes.emplace_back(Hash); - NewChunks.emplace_back(Chunk); + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + } + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + } } RwLock ChunkHashesLock; std::atomic_uint32_t AddedChunkCount; - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : NewChunks) { - const IoBuffer& Chunk = NewChunks[Idx]; - const IoHash& Hash = NewChunkHashes[Idx]; - ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); + ThreadPool.ScheduleWork([&Zcs, Chunk, &AddedChunkCount]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); AddedChunkCount.fetch_add(1); }); - ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { - IoHash ChunkHash = OldChunkHashes[Idx]; - IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]); - if (Chunk) + } + + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, Chunk]() { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { - CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); } }); } - - while (AddedChunkCount.load() < kChunkCount) + while (AddedChunkCount.load() < NewChunks.size()) { - std::vector AddedHashes; + std::unordered_map AddedChunks; { RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedHashes.swap(NewChunkHashes); + AddedChunks.swap(NewChunks); } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const IoHash& ChunkHash : AddedHashes) + for (const auto& Chunk : AddedChunks) { - if (Cas.HaveChunk(ChunkHash)) + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { - GcChunkHashes.emplace(ChunkHash); + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } } - std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; + std::vector KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) @@ -3249,7 +3266,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepHashes); - Cas.CollectGarbage(GcCtx); + Zcs.CollectGarbage(GcCtx); CasChunkSet& Deleted = GcCtx.DeletedCas(); Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } @@ -3260,27 +3277,41 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } { - std::vector AddedHashes; + std::unordered_map AddedChunks; { RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedHashes.swap(NewChunkHashes); + AddedChunks.swap(NewChunks); } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const IoHash& ChunkHash : AddedHashes) + for (const auto& Chunk : AddedChunks) { - if (Cas.HaveChunk(ChunkHash)) + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { - GcChunkHashes.emplace(ChunkHash); + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } } - std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; + std::vector KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; while (C < KeepHashes.size()) { - if (C % 77 == 0 && C < KeepHashes.size() - 1) + if (C % 155 == 0) { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } } C++; } @@ -3288,17 +3319,18 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepHashes); - Cas.CollectGarbage(GcCtx); + Zcs.CollectGarbage(GcCtx); CasChunkSet& Deleted = GcCtx.DeletedCas(); Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } } { - for (const IoHash& ChunkHash : GcChunkHashes) + for (const auto& Chunk : GcChunkHashes) { - ThreadPool.ScheduleWork([&Cas, ChunkHash]() { - CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + ThreadPool.ScheduleWork([&Zcs, Chunk]() { + ZenCacheValue CacheValue; + CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); }); } while (ThreadPool.PendingWork() > 0) @@ -3308,7 +3340,6 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } } -# endif #endif -- cgit v1.2.3 From b0e81cd90b705c27bc94f0aa9bdf73eeadccc164 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 12 Apr 2022 14:19:05 +0200 Subject: Make sure we generate unique chunks --- zenserver/cache/structuredcachestore.cpp | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 0ce473e89..9ae01c5df 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -270,9 +270,9 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) } ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) -: m_RootDir(RootDir) -, GcStorage(Gc) +: GcStorage(Gc) , GcContributor(Gc) +, m_RootDir(RootDir) , m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); @@ -2104,7 +2104,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) if (!NewBlockFile || (WriteOffset + Chunk.size() > MaxBlockSize)) { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed); + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); std::vector LogEntries = MakeDiskIndexEntries(MovedBlockChunks, {}); m_SlogFile.Append(LogEntries); m_SlogFile.Flush(); @@ -3127,15 +3127,28 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { + + while(true) { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + break; } + while(true) { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + break; } } @@ -3144,7 +3157,6 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) WorkerThreadPool ThreadPool(4); CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path()); - const GcClock::TimePoint CurrentTime = GcClock::Now(); { for (const auto& Chunk : Chunks) @@ -3160,7 +3172,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) const uint64_t TotalSize = Zcs.StorageSize().DiskSize; CHECK_EQ(kChunkSize * Chunks.size(), TotalSize); - { + { for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Zcs, &Chunk]() { -- cgit v1.2.3 From 2f362392dd2eba0d949e261ce5781965b8943d30 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 12 Apr 2022 14:47:32 +0200 Subject: remove unneeded lock in threaded test --- zenserver/cache/structuredcachestore.cpp | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 9ae01c5df..2746dc673 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -3213,7 +3213,6 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } - RwLock ChunkHashesLock; std::atomic_uint32_t AddedChunkCount; for (const auto& Chunk : NewChunks) @@ -3236,13 +3235,8 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } while (AddedChunkCount.load() < NewChunks.size()) { - std::unordered_map AddedChunks; - { - RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedChunks.swap(NewChunks); - } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : AddedChunks) + for (const auto& Chunk : NewChunks) { ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) @@ -3289,13 +3283,8 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } { - std::unordered_map AddedChunks; - { - RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedChunks.swap(NewChunks); - } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : AddedChunks) + for (const auto& Chunk : NewChunks) { ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) -- cgit v1.2.3 From 1cc2c8b9547e5244134299707ade3eb5afbf6c55 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 12 Apr 2022 22:33:35 +0200 Subject: Wait for work to complete rather than being picked up --- zenserver/cache/structuredcachestore.cpp | 49 +++++++++++++++++++------------- 1 file changed, 29 insertions(+), 20 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 2746dc673..3ba4e6b05 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -3127,8 +3127,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - - while(true) + while (true) { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); @@ -3136,10 +3135,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) { continue; } - Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; break; } - while(true) + while (true) { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); @@ -3147,23 +3146,27 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) { continue; } - Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; break; } } CreateDirectories(TempDir.Path()); - WorkerThreadPool ThreadPool(4); - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path()); + WorkerThreadPool ThreadPool(4); + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); { + std::atomic WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &Chunk]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); }); + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); + WorkCompleted.fetch_add(1); + }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < Chunks.size()) { Sleep(1); } @@ -3172,10 +3175,11 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) const uint64_t TotalSize = Zcs.StorageSize().DiskSize; CHECK_EQ(kChunkSize * Chunks.size(), TotalSize); - { + { + std::atomic WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &Chunk]() { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { std::string Bucket = Chunk.second.Bucket; IoHash ChunkHash = Chunk.first; ZenCacheValue CacheValue; @@ -3183,9 +3187,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); IoHash Hash = IoHash::HashBuffer(CacheValue.Value); CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < Chunks.size()) { Sleep(1); } @@ -3213,24 +3218,26 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } - std::atomic_uint32_t AddedChunkCount; - + std::atomic WorkCompleted = 0; + std::atomic_uint32_t AddedChunkCount = 0; for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Zcs, Chunk, &AddedChunkCount]() { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); }); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, Chunk]() { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); } + WorkCompleted.fetch_add(1); }); } while (AddedChunkCount.load() < NewChunks.size()) @@ -3277,7 +3284,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < NewChunks.size() + Chunks.size()) { Sleep(1); } @@ -3326,15 +3333,17 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } { + std::atomic WorkCompleted = 0; for (const auto& Chunk : GcChunkHashes) { - ThreadPool.ScheduleWork([&Zcs, Chunk]() { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { ZenCacheValue CacheValue; CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < GcChunkHashes.size()) { Sleep(1); } -- cgit v1.2.3 From f7476199742256fed6f867afb6db8cecdacfe547 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 26 Apr 2022 13:36:02 +0200 Subject: Batch log removal of Cid and take proper lock when modifying m_CidMap (#80) * Batch log removal of Cid and take proper lock when modifying m_CidMap * variable name casing * Don't access m_Buckets without a lock --- zenserver/cache/structuredcachestore.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 738e4c1fd..6bf513105 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1255,10 +1255,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z auto It = m_Buckets.try_emplace(BucketName, BucketName); Bucket = &It.first->second; - std::filesystem::path bucketPath = m_RootDir; - bucketPath /= BucketName; + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; - Bucket->OpenOrCreate(bucketPath); + Bucket->OpenOrCreate(BucketPath); } } @@ -1363,11 +1363,12 @@ void ZenCacheDiskLayer::Flush() { std::vector Buckets; - Buckets.reserve(m_Buckets.size()); + { RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(&Kv.second); -- cgit v1.2.3 From 427a616c5a1abfe111cc5bb87526df5080de37d2 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 27 Apr 2022 08:34:13 +0200 Subject: trigger clang format --- zenserver/cache/structuredcachestore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 738e4c1fd..f499cf194 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1363,8 +1363,8 @@ void ZenCacheDiskLayer::Flush() { std::vector Buckets; - Buckets.reserve(m_Buckets.size()); + Buckets.reserve(m_Buckets.size()); { RwLock::SharedLockScope _(m_Lock); -- cgit v1.2.3 From be12749e0adde39d47875d3c4d2136dbcffbcb3d Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Sun, 1 May 2022 22:34:31 +0200 Subject: collectgarbage for compactcas and structured cache uses shared implementation --- zenserver/cache/structuredcachestore.cpp | 628 ++++++++----------------------- 1 file changed, 161 insertions(+), 467 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 53a479edb..d313cd0c2 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -127,25 +127,9 @@ namespace { static_assert(sizeof(LegacyDiskIndexEntry) == 36); - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".slog"; - const char* DataExtension = ".sobs"; - - std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) - { - ExtendablePathBuilder<256> Path; - - char BlockHexString[9]; - ToHexNumber(BlockIndex, BlockHexString); - - Path.Append(BlocksBasePath); - Path.AppendSeparator(); - Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); - Path.AppendSeparator(); - Path.Append(BlockHexString); - Path.Append(DataExtension); - return Path.ToPath(); - } + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + const char* LegacyDataExtension = ".sobs"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { @@ -169,7 +153,7 @@ namespace { std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir) { - return BucketDir / (std::string("zen") + DataExtension); + return BucketDir / (std::string("zen") + LegacyDataExtension); } std::vector MakeDiskIndexEntries(const std::unordered_map& MovedChunks, @@ -718,8 +702,6 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() std::vector Entries; { - RwLock::SharedLockScope __(m_InsertLock); - RwLock::SharedLockScope ___(m_IndexLock); Entries.resize(m_Index.size()); uint64_t EntryIndex = 0; @@ -896,7 +878,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) }); uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) { ++WriteBlockIndex; } @@ -1083,7 +1065,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) } LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex); CreateDirectories(BlockPath.parent_path()); BlockFile.Close(); std::filesystem::rename(LegacyDataPath, BlockPath); @@ -1152,7 +1134,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) BlockRanges.push_back(BlockRange); WriteBlockIndex++; - while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) { ++WriteBlockIndex; } @@ -1191,7 +1173,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) NiceTimeSpanMs(ETA)); } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); + std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); BlockStoreFile ChunkBlock(BlockPath); ChunkBlock.Create(BlockRange.BlockSize); uint64_t Offset = 0; @@ -1299,7 +1281,8 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - std::unordered_set KnownBlocks; + std::vector KnownLocations; + KnownLocations.reserve(m_Index.size()); for (const auto& Entry : m_Index) { const DiskLocation& Location = Entry.second.Location; @@ -1308,62 +1291,11 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is { continue; } - KnownBlocks.insert(Location.GetBlockLocation(m_PayloadAlignment).BlockIndex); + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + KnownLocations.push_back(BlockLocation); } - if (std::filesystem::is_directory(m_BlocksBasePath)) - { - std::vector FoldersToScan; - FoldersToScan.push_back(m_BlocksBasePath); - size_t FolderOffset = 0; - while (FolderOffset < FoldersToScan.size()) - { - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) - { - if (Entry.is_directory()) - { - FoldersToScan.push_back(Entry.path()); - continue; - } - if (Entry.is_regular_file()) - { - const std::filesystem::path Path = Entry.path(); - if (Path.extension() != DataExtension) - { - continue; - } - std::string FileName = Path.stem().string(); - uint32_t BlockIndex; - bool OK = ParseHexNumber(FileName, BlockIndex); - if (!OK) - { - continue; - } - if (!KnownBlocks.contains(BlockIndex)) - { - // Log removing unreferenced block - // Clear out unused blocks - ZEN_INFO("removing unused block for '{}' at '{}'", m_BucketDir / m_BucketName, Path); - std::error_code Ec; - std::filesystem::remove(Path, Ec); - if (Ec) - { - ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); - } - continue; - } - Ref BlockFile = new BlockStoreFile(Path); - BlockFile->Open(); - m_ChunkBlocks[BlockIndex] = BlockFile; - } - } - ++FolderOffset; - } - } - else - { - CreateDirectories(m_BlocksBasePath); - } + m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0)) { @@ -1390,14 +1322,14 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H bool ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) { - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); + + Ref ChunkBlock = m_BlockStore.GetChunkBlock(Location); + if (!ChunkBlock) { return false; } - const BlockStoreLocation& Location = Loc.GetBlockLocation(m_PayloadAlignment); - Ref ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; - OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size); OutValue.Value.SetContentType(Loc.GetContentType()); @@ -1437,15 +1369,17 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal { IndexEntry& Entry = It.value(); Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + DiskLocation Location = Entry.Location; + _.ReleaseNow(); - if (GetInlineCacheValue(Entry.Location, OutValue)) + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + return GetStandaloneCacheValue(Location, HashKey, OutValue); + } + if (GetInlineCacheValue(Location, OutValue)) { return true; } - - _.ReleaseNow(); - - return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue); } return false; @@ -1463,84 +1397,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& { return PutStandaloneCacheValue(HashKey, Value); } - - // Small object put - - uint8_t EntryFlags = 0; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - uint64_t ChunkSize = Value.Value.Size(); - - uint32_t WriteBlockIndex; - Ref WriteBlock; - uint64_t InsertOffset; - - { - RwLock::ExclusiveLockScope _(m_InsertLock); - - WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - bool IsWriting = m_WriteBlock != nullptr; - if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > MaxBlockSize) - { - if (m_WriteBlock) - { - m_WriteBlock = nullptr; - } - { - RwLock::ExclusiveLockScope __(m_IndexLock); - if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) - { - throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BucketDir / m_BucketName)); - } - WriteBlockIndex += IsWriting ? 1 : 0; - while (m_ChunkBlocks.contains(WriteBlockIndex)) - { - WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - m_WriteBlock = new BlockStoreFile(BlockPath); - m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - } - m_CurrentInsertOffset = 0; - m_WriteBlock->Create(MaxBlockSize); - } - InsertOffset = m_CurrentInsertOffset; - m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); - WriteBlock = m_WriteBlock; - } - - DiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment, EntryFlags); - const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; - - WriteBlock->Write(Value.Value.Data(), ChunkSize, InsertOffset); - m_SlogFile.Append(DiskIndexEntry); - - m_TotalSize.fetch_add(ChunkSize, std::memory_order_seq_cst); - { - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - IndexEntry& Entry = It.value(); - Entry.Location = Location; - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - } - else - { - m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); - } - } + PutInlineCacheValue(HashKey, Value); } void @@ -1555,21 +1412,10 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { - { - RwLock::ExclusiveLockScope _(m_InsertLock); - if (m_CurrentInsertOffset > 0) - { - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - m_WriteBlock = nullptr; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - m_CurrentInsertOffset = 0; - } - } - RwLock::SharedLockScope _(m_IndexLock); + m_BlockStore.Flush(); + RwLock::SharedLockScope _(m_IndexLock); MakeIndexSnapshot(); - SaveManifest(); } @@ -1615,20 +1461,22 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) ZenCacheValue Value; - if (GetInlineCacheValue(Loc, Value)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - // Validate contents + if (GetInlineCacheValue(Loc, Value)) + { + // Validate contents + continue; + } } else if (GetStandaloneCacheValue(Loc, HashKey, Value)) { // Note: we cannot currently validate contents since we don't // have a content hash! + continue; } - else - { - // Value not found - BadKeys.push_back(HashKey); - } + // Value not found + BadKeys.push_back(HashKey); } } @@ -1726,18 +1574,23 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); - if (!GetInlineCacheValue(Loc, CacheValue)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - GetStandaloneCacheValue(Loc, Key, CacheValue); + if (!GetStandaloneCacheValue(Loc, Key, CacheValue)) + { + continue; + } + } + else if (!GetInlineCacheValue(Loc, CacheValue)) + { + continue; } } - if (CacheValue.Value) - { - ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); - CbObject Obj(SharedBuffer{CacheValue.Value}); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - } + ZEN_ASSERT(CacheValue.Value); + ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); + CbObject Obj(SharedBuffer{CacheValue.Value}); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } @@ -1797,10 +1650,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_SlogFile.Flush(); - IndexMap Index; - size_t BlockCount; - uint64_t ExcludeBlockIndex = 0x800000000ull; - std::span ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName); std::vector DeleteCacheKeys; DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); @@ -1816,30 +1665,27 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); return; } + + IndexMap Index; + BlockStore::ReclaimSnapshotState BlockStoreState; { - RwLock::SharedLockScope __(m_InsertLock); - RwLock::SharedLockScope ___(m_IndexLock); + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); { - Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); if (m_Index.empty()) { ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); return; } - if (m_WriteBlock) - { - ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - } - __.ReleaseNow(); + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } SaveManifest(); - Index = m_Index; - BlockCount = m_ChunkBlocks.size(); + Index = m_Index; for (const IoHash& Key : DeleteCacheKeys) { @@ -1936,295 +1782,102 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { return; } - std::unordered_map BlockIndexToChunkMapIndex; - std::vector> KeepChunks; - std::vector> DeleteChunks; - - BlockIndexToChunkMapIndex.reserve(BlockCount); - KeepChunks.reserve(BlockCount); - DeleteChunks.reserve(BlockCount); - size_t GuesstimateCountPerBlock = TotalChunkHashes.size() / BlockCount / 2; - - uint64_t DeleteCount = 0; - - uint64_t NewTotalSize = 0; + TotalChunkCount = TotalChunkHashes.size(); - std::unordered_set Expired; - Expired.insert(DeleteCacheKeys.begin(), DeleteCacheKeys.end()); + std::vector ChunkLocations; + std::vector KeepChunkIndexes; + std::vector ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = Index.find(ChunkHash); - const DiskLocation& Location = KeyIt->second.Location; - BlockStoreLocation BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); - - uint32_t BlockIndex = BlockLocation.BlockIndex; - - if (static_cast(BlockIndex) == ExcludeBlockIndex) - { - return; - } - - auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); - size_t ChunkMapIndex = 0; - if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) - { - ChunkMapIndex = KeepChunks.size(); - BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; - KeepChunks.resize(ChunkMapIndex + 1); - KeepChunks.back().reserve(GuesstimateCountPerBlock); - DeleteChunks.resize(ChunkMapIndex + 1); - DeleteChunks.back().reserve(GuesstimateCountPerBlock); - } - else - { - ChunkMapIndex = BlockIndexPtr->second; - } + auto KeyIt = Index.find(ChunkHash); + const DiskLocation& DiskLocation = KeyIt->second.Location; + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; if (Keep) { - std::vector& ChunkMap = KeepChunks[ChunkMapIndex]; - ChunkMap.push_back(ChunkHash); - NewTotalSize += BlockLocation.Size; - } - else - { - std::vector& ChunkMap = DeleteChunks[ChunkMapIndex]; - ChunkMap.push_back(ChunkHash); - DeleteCount++; + KeepChunkIndexes.push_back(ChunkIndex); } }); - std::unordered_set BlocksToReWrite; - BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); - for (const auto& Entry : BlockIndexToChunkMapIndex) - { - uint32_t BlockIndex = Entry.first; - size_t ChunkMapIndex = Entry.second; - const std::vector& ChunkMap = DeleteChunks[ChunkMapIndex]; - if (ChunkMap.empty()) - { - continue; - } - BlocksToReWrite.insert(BlockIndex); - } + size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_BucketDir / m_BucketName, DeleteCount, - NiceBytes(TotalSize - NewTotalSize), + 0, // NiceBytes(TotalSize - NewTotalSize), TotalChunkCount, NiceBytes(TotalSize)); return; } - auto AddToDeleted = [this, &Index, &DeletedCount, &DeletedSize](const std::vector& DeletedEntries) { - for (const IoHash& ChunkHash : DeletedEntries) - { - const DiskLocation& Location = Index[ChunkHash].Location; - ZEN_ASSERT(!Location.IsFlagSet(DiskLocation::kStandaloneFile)); - DeletedSize += Index[ChunkHash].Location.GetBlockLocation(m_PayloadAlignment).Size; - } - DeletedCount += DeletedEntries.size(); - }; - - // Move all chunks in blocks that have chunks removed to new blocks - - Ref NewBlockFile; - uint64_t WriteOffset = 0; - uint32_t NewBlockIndex = 0; - - auto UpdateLocations = [this](const std::span& Entries) { - for (const DiskIndexEntry& Entry : Entries) - { - if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + std::vector DeletedChunks; + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [this, &DeletedChunks, &ChunkIndexToChunkHash, &Index, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( + uint32_t, + const std::unordered_map& MovedChunks, + const std::vector& RemovedChunks) { + std::vector LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) { - auto KeyIt = m_Index.find(Entry.Key); - uint64_t ChunkSize = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment).Size; - m_TotalSize.fetch_sub(ChunkSize, std::memory_order_seq_cst); - m_Index.erase(KeyIt); - continue; + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; + LogEntries.push_back( + {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; + LogEntries.push_back({.Key = ChunkHash, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), + m_PayloadAlignment, + OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + DeletedChunks.push_back(ChunkHash); } - m_Index[Entry.Key].Location = Entry.Location; - } - }; - - std::unordered_map MovedBlockChunks; - for (uint32_t BlockIndex : BlocksToReWrite) - { - const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; - - Ref OldBlockFile; - { - RwLock::SharedLockScope _i(m_IndexLock); - OldBlockFile = m_ChunkBlocks[BlockIndex]; - } - const std::vector& KeepMap = KeepChunks[ChunkMapIndex]; - if (KeepMap.empty()) - { - const std::vector& DeleteMap = DeleteChunks[ChunkMapIndex]; - std::vector LogEntries = MakeDiskIndexEntries({}, DeleteMap); m_SlogFile.Append(LogEntries); m_SlogFile.Flush(); { - RwLock::ExclusiveLockScope _i(m_IndexLock); + RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; - const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + const auto ____ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); - UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex] = nullptr; - } - AddToDeleted(DeleteMap); - ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'", - m_BucketDir / m_BucketName, - BlockIndex, - OldBlockFile->GetPath()); - std::error_code Ec; - OldBlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) - { - ZEN_WARN("Failed to flag file '{}' for deletion, reason: '{}'", OldBlockFile->GetPath(), Ec.message()); - } - continue; - } - - std::vector Chunk; - for (const IoHash& ChunkHash : KeepMap) - { - auto KeyIt = Index.find(ChunkHash); - const BlockStoreLocation ChunkLocation = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment); - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - - if (!NewBlockFile || (WriteOffset + Chunk.size() > MaxBlockSize)) - { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); - std::vector LogEntries = MakeDiskIndexEntries(MovedBlockChunks, {}); - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - - if (NewBlockFile) - { - NewBlockFile->Truncate(WriteOffset); - NewBlockFile->Flush(); - } + for (const DiskIndexEntry& Entry : LogEntries) { - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - UpdateLocations(LogEntries); - if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + if (Entry.Location.GetFlags() & DiskLocation::kTombStone) { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_BucketDir / m_BucketName, - static_cast(std::numeric_limits::max()) + 1); - return; - } - while (m_ChunkBlocks.contains(NextBlockIndex)) - { - NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - } - std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; - } - - MovedCount += MovedBlockChunks.size(); - MovedBlockChunks.clear(); - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return; - } - if (Space.Free < MaxBlockSize) - { - uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve(); - if (Space.Free + ReclaimedSpace < MaxBlockSize) - { - ZEN_WARN("garbage collect from '{}' FAILED, required disk space {}, free {}", - m_BucketDir / m_BucketName, - MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); - RwLock::ExclusiveLockScope _l(m_IndexLock); - Stopwatch Timer; - const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - m_ChunkBlocks.erase(NextBlockIndex); - return; + m_Index.erase(Entry.Key); + uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size; + m_TotalSize.fetch_sub(ChunkSize); + continue; } - - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", - m_BucketDir / m_BucketName, - ReclaimedSpace, - NiceBytes(Space.Free + ReclaimedSpace)); + m_Index[Entry.Key].Location = Entry.Location; } - NewBlockFile->Create(MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; } + }); - NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - MovedBlockChunks.emplace(ChunkHash, - DiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, - m_PayloadAlignment, - KeyIt->second.Location.Flags)); - WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); - } - Chunk.clear(); - if (NewBlockFile) - { - NewBlockFile->Truncate(WriteOffset); - NewBlockFile->Flush(); - NewBlockFile = {}; - } - - const std::vector& DeleteMap = DeleteChunks[ChunkMapIndex]; - std::vector LogEntries = MakeDiskIndexEntries(MovedBlockChunks, DeleteMap); - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - { - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex] = nullptr; - } - MovedCount += MovedBlockChunks.size(); - AddToDeleted(DeleteMap); - MovedBlockChunks.clear(); - - ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'", - m_BucketDir / m_BucketName, - BlockIndex, - OldBlockFile->GetPath()); - std::error_code Ec; - OldBlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) - { - ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); - } - OldBlockFile = nullptr; - } + GcCtx.DeletedCas(DeletedChunks); } void @@ -2367,6 +2020,47 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c m_TotalSize.fetch_add(Loc.Size(), std::memory_order_seq_cst); } +void +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + uint8_t EntryFlags = 0; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + uint64_t ChunkSize = Value.Value.Size(); + + m_BlockStore.WriteChunk(Value.Value.Data(), + ChunkSize, + m_PayloadAlignment, + [this, &HashKey, EntryFlags](const BlockStoreLocation& BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; + m_SlogFile.Append(DiskIndexEntry); + m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order_seq_cst); + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + IndexEntry& Entry = It.value(); + Entry.Location = Location; + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + } + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } + }); +} + ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) @@ -3026,7 +2720,7 @@ TEST_CASE("z$.legacyconversion") std::filesystem::path BucketDir = TempDir.Path() / Bucket; std::filesystem::path BlocksBaseDir = BucketDir / "blocks"; - std::filesystem::path CasPath = GetBlockPath(BlocksBaseDir, 1); + std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1); std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir); std::filesystem::remove(LegacyDataPath); std::filesystem::rename(CasPath, LegacyDataPath); -- cgit v1.2.3 From 6e6035499b3fe40b22e1be5aee9ac3a9675d27b0 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Sun, 1 May 2022 22:55:43 +0200 Subject: remove m_TotalSize for blockstore fix scrub logic in structured cache store --- zenserver/cache/structuredcachestore.cpp | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index d313cd0c2..f26d599ab 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1376,10 +1376,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal { return GetStandaloneCacheValue(Location, HashKey, OutValue); } - if (GetInlineCacheValue(Location, OutValue)) - { - return true; - } + return GetInlineCacheValue(Location, OutValue); } return false; @@ -1463,16 +1460,16 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - if (GetInlineCacheValue(Loc, Value)) + if (GetStandaloneCacheValue(Loc, HashKey, Value)) { - // Validate contents + // Note: we cannot currently validate contents since we don't + // have a content hash! continue; } } - else if (GetStandaloneCacheValue(Loc, HashKey, Value)) + else if (GetInlineCacheValue(Loc, Value)) { - // Note: we cannot currently validate contents since we don't - // have a content hash! + // Validate contents continue; } // Value not found @@ -1724,6 +1721,12 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); if (m_Index.contains(Key)) { // Someone added it back, let the file on disk be -- cgit v1.2.3 From 75b1dd112aead7c5246fa84928b9cd96dde49cbc Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Sun, 1 May 2022 23:34:20 +0200 Subject: respect Ctx.RunRecovery() --- zenserver/cache/structuredcachestore.cpp | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index f26d599ab..9ae5b0f17 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1497,6 +1497,11 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) m_Index.erase(BadKey); } } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCasChunks(BadKeys); } void -- cgit v1.2.3 From c89190f7fabf8a08cda2255937dc99ca35972210 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 10:18:31 +0200 Subject: Move bulk of MigrateLegacyData to blockstore.cpp --- zenserver/cache/structuredcachestore.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 9ae5b0f17..5cebaa948 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1836,7 +1836,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_PayloadAlignment, false, [this, &DeletedChunks, &ChunkIndexToChunkHash, &Index, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( - uint32_t, const std::unordered_map& MovedChunks, const std::vector& RemovedChunks) { std::vector LogEntries; -- cgit v1.2.3 From 48f2e3af59e2a06c81e37170db95e432b148e5e8 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 10:48:57 +0200 Subject: refactor structured cache to use blockstore migrate --- zenserver/cache/structuredcachestore.cpp | 309 +++++++------------------------ 1 file changed, 66 insertions(+), 243 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 5cebaa948..e9c051f88 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -877,35 +877,17 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) NiceBytes(TotalSize)); }); - uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + uint64_t BlockFileSize = 0; { - ++WriteBlockIndex; + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + BlockFileSize = BlockFile.FileSize(); } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return 0; - } - - if (Space.Free < MaxBlockSize) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", - m_BucketDir / m_BucketName, - MaxBlockSize, - NiceBytes(Space.Free)); - return 0; - } - - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - std::unordered_map LegacyDiskIndex; uint64_t InvalidEntryCount = 0; + size_t BlockChunkCount = 0; TCasLogFile LegacyCasLog; LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); { @@ -942,7 +924,6 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) 0); std::vector BadEntries; - uint64_t BlockFileSize = BlockFile.FileSize(); for (const auto& Entry : LegacyDiskIndex) { const LegacyDiskIndexEntry& Record(Entry.second); @@ -952,6 +933,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) } if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize) { + BlockChunkCount++; continue; } ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); @@ -972,7 +954,6 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) if (LegacyDiskIndex.empty()) { LegacyCasLog.Close(); - BlockFile.Close(); if (CleanSource) { // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find @@ -988,250 +969,92 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) return 0; } - uint64_t BlockChunkCount = 0; - uint64_t BlockTotalSize = 0; - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyDiskIndexEntry& Record(Entry.second); - if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - continue; - } - BlockChunkCount++; - BlockTotalSize += Record.Location.Size(); - } - - uint64_t RequiredDiskSpace = BlockTotalSize + ((m_PayloadAlignment - 1) * BlockChunkCount); - uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; - if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", - m_BucketDir / m_BucketName, - MaxRequiredBlockCount, - BlockStoreDiskLocation::MaxBlockIndex); - return 0; - } - - constexpr const uint64_t DiskReserve = 1ul << 28; - - if (CleanSource) - { - if (Space.Free < (MaxBlockSize + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_BucketDir / m_BucketName, - NiceBytes(MaxBlockSize + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - else - { - if (Space.Free < (RequiredDiskSpace + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_BucketDir / m_BucketName, - NiceBytes(RequiredDiskSpace + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); CreateDirectories(LogPath.parent_path()); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - if (CleanSource && (MaxRequiredBlockCount < 2)) - { - std::vector LogEntries; - LogEntries.reserve(LegacyDiskIndex.size()); + std::unordered_map ChunkIndexToChunkHash; + std::vector ChunkLocations; + ChunkIndexToChunkHash.reserve(BlockChunkCount); + ChunkLocations.reserve(BlockChunkCount); - // We can use the block as is, just move it and add the blocks to our new log - for (auto& Entry : LegacyDiskIndex) - { - const LegacyDiskIndexEntry& Record(Entry.second); + std::vector LogEntries; + LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount); - DiskLocation NewLocation; - uint8_t Flags = 0xff & (Record.Location.Flags() >> 56); - if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - NewLocation = DiskLocation(Record.Location.Size(), Flags); - } - else - { - BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.Offset(), Record.Location.Size()); - NewLocation = DiskLocation(NewChunkLocation, m_PayloadAlignment, Flags); - } - LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); - } - std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - CreateDirectories(BlockPath.parent_path()); - BlockFile.Close(); - std::filesystem::rename(LegacyDataPath, BlockPath); - CasLog.Append(LogEntries); - for (const DiskIndexEntry& Entry : LogEntries) + for (const auto& Entry : LegacyDiskIndex) + { + const IoHash& ChunkHash = Entry.first; + const LegacyDiskLocation& Location = Entry.second.Location; + if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + uint8_t Flags = 0xff & (Location.Flags() >> 56); + DiskLocation NewLocation = DiskLocation(Location.Size(), Flags); + LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); + continue; } - - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()}); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + TotalSize += Location.Size(); } - else + for (const DiskIndexEntry& Entry : LogEntries) { - std::vector ChunkHashes; - ChunkHashes.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - ChunkHashes.push_back(Entry.first); - } - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = LegacyDiskIndex.find(Lhs); - auto RhsKeyIt = LegacyDiskIndex.find(Rhs); - return LhsKeyIt->second.Location.Offset() < RhsKeyIt->second.Location.Offset(); - }); - - uint64_t BlockSize = 0; - uint64_t BlockOffset = 0; - std::vector NewLocations; - struct BlockData - { - std::vector> Chunks; - uint64_t BlockOffset; - uint64_t BlockSize; - uint32_t BlockIndex; - }; - - std::vector BlockRanges; - std::vector> Chunks; - BlockRanges.reserve(MaxRequiredBlockCount); - for (const IoHash& ChunkHash : ChunkHashes) - { - const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; - const LegacyDiskLocation& LegacyChunkLocation = LegacyEntry.Location; - - if (LegacyChunkLocation.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - // For standalone files we just store the chunk hash an use the size from the legacy index as is - Chunks.push_back({ChunkHash, {}}); - continue; - } - - uint64_t ChunkOffset = LegacyChunkLocation.Offset(); - uint64_t ChunkSize = LegacyChunkLocation.Size(); - uint64_t ChunkEnd = ChunkOffset + ChunkSize; - - if (BlockSize == 0) - { - BlockOffset = ChunkOffset; - } - if ((ChunkEnd - BlockOffset) > MaxBlockSize) - { - BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; - BlockRange.Chunks.swap(Chunks); - BlockRanges.push_back(BlockRange); - - WriteBlockIndex++; - while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - BlockOffset = ChunkOffset; - BlockSize = 0; - } - BlockSize = RoundUp(BlockSize, m_PayloadAlignment); - BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; - Chunks.push_back({ChunkHash, ChunkLocation}); - BlockSize = ChunkEnd - BlockOffset; - } - if (BlockSize > 0) - { - BlockRanges.push_back( - {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); - } - Stopwatch WriteBlockTimer; - - std::reverse(BlockRanges.begin(), BlockRanges.end()); - std::vector Buffer(1 << 28); - for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) - { - const BlockData& BlockRange = BlockRanges[Idx]; - if (Idx > 0) - { - uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; - uint64_t Completed = BlockOffset + BlockSize - Remaining; - uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; - - ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", - m_BucketDir / m_BucketDir, - Idx, - BlockRanges.size(), - NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), - NiceBytes(BlockOffset + BlockSize), - NiceTimeSpanMs(ETA)); - } - - std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); - ChunkBlock.Create(BlockRange.BlockSize); - uint64_t Offset = 0; - while (Offset < BlockRange.BlockSize) - { - uint64_t Size = BlockRange.BlockSize - Offset; - if (Size > Buffer.size()) - { - Size = Buffer.size(); - } - BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); - ChunkBlock.Write(Buffer.data(), Size, Offset); - Offset += Size; - } - ChunkBlock.Truncate(Offset); - ChunkBlock.Flush(); + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + CasLog.Append(LogEntries); + m_BlockStore.Split( + ChunkLocations, + LegacyDataPath, + m_BlocksBasePath, + MaxBlockSize, + BlockStoreDiskLocation::MaxBlockIndex + 1, + m_PayloadAlignment, + CleanSource, + [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( + const std::vector>& MovedChunks) { std::vector LogEntries; - LogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; - - DiskLocation NewLocation; - uint8_t Flags = 0xff & (LegacyEntry.Location.Flags() >> 56); - if (LegacyEntry.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - NewLocation = DiskLocation(LegacyEntry.Location.Size(), Flags); - } - else - { - NewLocation = DiskLocation(Entry.second, m_PayloadAlignment, Flags); - } - LogEntries.push_back({.Key = Entry.first, .Location = NewLocation}); + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& OldLocation = OldEntry.Location; + uint8_t Flags = 0xff & (OldLocation.Flags() >> 56); + LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)}); } - CasLog.Append(LogEntries); for (const DiskIndexEntry& Entry : LogEntries) { m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); } - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - + CasLog.Append(LogEntries); + CasLog.Flush(); if (CleanSource) { std::vector LegacyLogEntries; - LegacyLogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LegacyLogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - LegacyLogEntries.push_back( - {.Key = Entry.first, .Location = LegacyDiskLocation(0, 0, 0, LegacyDiskLocation::kTombStone)}); + size_t ChunkIndex = Entry.first; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& OldLocation = OldEntry.Location; + LegacyDiskLocation NewLocation(OldLocation.Offset(), + OldLocation.Size(), + 0, + OldLocation.Flags() | LegacyDiskLocation::kTombStone); + LegacyLogEntries.push_back(LegacyDiskIndexEntry(ChunkHash, NewLocation)); } LegacyCasLog.Append(LegacyLogEntries); - BlockFile.SetFileSize(BlockRange.BlockOffset); + LegacyCasLog.Flush(); } - } - } - BlockFile.Close(); + MigratedBlockCount++; + MigratedChunkCount += MovedChunks.size(); + }); + LegacyCasLog.Close(); CasLog.Close(); -- cgit v1.2.3 From dc589650427f2ab444a7ebf78fb59ee751a4c2c8 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 10:53:41 +0200 Subject: use std::vector> instead of map --- zenserver/cache/structuredcachestore.cpp | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index e9c051f88..9cfb5fbf3 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -156,24 +156,6 @@ namespace { return BucketDir / (std::string("zen") + LegacyDataExtension); } - std::vector MakeDiskIndexEntries(const std::unordered_map& MovedChunks, - const std::vector& DeletedChunks) - { - std::vector result; - result.reserve(MovedChunks.size()); - for (const auto& MovedEntry : MovedChunks) - { - result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second}); - } - for (const IoHash& ChunkHash : DeletedChunks) - { - DiskLocation Location; - Location.Flags |= DiskLocation::kTombStone; - result.push_back({.Key = ChunkHash, .Location = Location}); - } - return result; - } - bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -1659,8 +1641,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_PayloadAlignment, false, [this, &DeletedChunks, &ChunkIndexToChunkHash, &Index, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( - const std::unordered_map& MovedChunks, - const std::vector& RemovedChunks) { + const std::vector>& MovedChunks, + const std::vector& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); for (const auto& Entry : MovedChunks) -- cgit v1.2.3 From 80a39f97f465465466ccd2d5914421db55efb80e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 11:01:12 +0200 Subject: add back gc space reclaim call --- zenserver/cache/structuredcachestore.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 9cfb5fbf3..5b08ed83c 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1687,7 +1687,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_Index[Entry.Key].Location = Entry.Location; } } - }); + }, + [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); GcCtx.DeletedCas(DeletedChunks); } -- cgit v1.2.3 From 10690e805f45a590094a659c8e1f6482d12aaf8e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 11:03:04 +0200 Subject: cleanup --- zenserver/cache/structuredcachestore.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 5b08ed83c..143f43deb 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -995,7 +995,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) m_PayloadAlignment, CleanSource, [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( - const std::vector>& MovedChunks) { + const BlockStore::MovedChunksArray& MovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size()); for (const auto& Entry : MovedChunks) @@ -1598,7 +1598,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) TotalChunkCount = TotalChunkHashes.size(); std::vector ChunkLocations; - std::vector KeepChunkIndexes; + BlockStore::ChunkIndexArray KeepChunkIndexes; std::vector ChunkIndexToChunkHash; ChunkLocations.reserve(TotalChunkCount); ChunkLocations.reserve(TotalChunkCount); @@ -1641,8 +1641,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_PayloadAlignment, false, [this, &DeletedChunks, &ChunkIndexToChunkHash, &Index, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( - const std::vector>& MovedChunks, - const std::vector& RemovedChunks) { + const BlockStore::MovedChunksArray& MovedChunks, + const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); for (const auto& Entry : MovedChunks) -- cgit v1.2.3 From f14272b7adfb562a39295324a92c7bbf31c9bd3e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 11:22:14 +0200 Subject: restore cg comment --- zenserver/cache/structuredcachestore.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 143f43deb..6311fc2c5 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1410,7 +1410,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - std::vector ExpiredStandaloneEntries; + ZEN_INFO("collecting garbage from '{}'", m_BucketDir / m_BucketName); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; @@ -1473,6 +1473,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) return; } + std::vector ExpiredStandaloneEntries; IndexMap Index; BlockStore::ReclaimSnapshotState BlockStoreState; { -- cgit v1.2.3 From 56381dc0de2d19a373c132b9a624308dc88e31bd Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 2 May 2022 14:35:20 +0200 Subject: removed redundant pragma pack --- zenserver/cache/structuredcachestore.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 6311fc2c5..015912ce9 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -39,9 +39,6 @@ ZEN_THIRD_PARTY_INCLUDES_END ////////////////////////////////////////////////////////////////////////// -#pragma pack(push) -#pragma pack(1) - namespace zen { namespace { @@ -863,7 +860,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) { BasicFile BlockFile; BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - BlockFileSize = BlockFile.FileSize(); + BlockFileSize = BlockFile.FileSize();strcut } std::unordered_map LegacyDiskIndex; -- cgit v1.2.3 From bb7593c9ea3412a48b3d29f3e7f7b23d7a785b2f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 16:41:32 +0200 Subject: Refactor WriteChunk to not need callback --- zenserver/cache/structuredcachestore.cpp | 46 ++++++++++++++------------------ 1 file changed, 20 insertions(+), 26 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 015912ce9..6f6f182b9 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -860,7 +860,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) { BasicFile BlockFile; BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - BlockFileSize = BlockFile.FileSize();strcut + BlockFileSize = BlockFile.FileSize(); } std::unordered_map LegacyDiskIndex; @@ -1845,31 +1845,25 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const EntryFlags |= DiskLocation::kCompressed; } - uint64_t ChunkSize = Value.Value.Size(); - - m_BlockStore.WriteChunk(Value.Value.Data(), - ChunkSize, - m_PayloadAlignment, - [this, &HashKey, EntryFlags](const BlockStoreLocation& BlockStoreLocation) { - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; - m_SlogFile.Append(DiskIndexEntry); - m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order_seq_cst); - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - IndexEntry& Entry = It.value(); - Entry.Location = Location; - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - } - else - { - m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); - } - }); + BlockStoreLocation BlockStoreLocation = m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment); + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; + m_SlogFile.Append(DiskIndexEntry); + m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order_seq_cst); + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + IndexEntry& Entry = It.value(); + Entry.Location = Location; + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + } + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } } ////////////////////////////////////////////////////////////////////////// -- cgit v1.2.3 From 0599d52d80beb85e50ffe1f56c8434376a8c08a2 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 16:50:08 +0200 Subject: simplify lambda captures --- zenserver/cache/structuredcachestore.cpp | 46 ++++++++++++-------------------- 1 file changed, 17 insertions(+), 29 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 6f6f182b9..46f9d8fe6 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -366,8 +366,8 @@ void ZenCacheStore::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; - const auto Guard = MakeGuard( - [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto Guard = + MakeGuard([&] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); access_tracking::AccessTimes AccessTimes; m_MemLayer.GatherAccessTimes(AccessTimes); @@ -650,7 +650,7 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName); uint64_t EntryCount = 0; Stopwatch Timer; - const auto _ = MakeGuard([this, &EntryCount, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_BucketDir / m_BucketName, EntryCount, @@ -734,7 +734,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile() if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; - const auto _ = MakeGuard([this, &Entries, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing #{} entries in {}", m_BucketDir / m_BucketName, Entries.size(), @@ -787,7 +787,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) if (std::filesystem::is_regular_file(LogPath)) { Stopwatch Timer; - const auto _ = MakeGuard([LogPath, &Entries, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; @@ -847,7 +847,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) uint32_t MigratedBlockCount = 0; Stopwatch MigrationTimer; uint64_t TotalSize = 0; - const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { + const auto _ = MakeGuard([&] { ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", m_BucketDir / m_BucketName, MigratedChunkCount, @@ -871,7 +871,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); { Stopwatch Timer; - const auto __ = MakeGuard([LegacyLogPath, &LegacyDiskIndex, &Timer] { + const auto __ = MakeGuard([&] { ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", LegacyLogPath, LegacyDiskIndex.size(), @@ -1317,7 +1317,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) uint64_t ReadBlockLongestTimeUs = 0; Stopwatch TotalTimer; - const auto _ = MakeGuard([this, &TotalTimer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + const auto _ = MakeGuard([&] { ZEN_INFO("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", m_BucketDir / m_BucketName, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), @@ -1336,7 +1336,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); @@ -1373,7 +1373,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); @@ -1421,17 +1421,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) uint64_t DeletedCount = 0; uint64_t MovedCount = 0; - const auto _ = MakeGuard([this, - &TotalTimer, - &WriteBlockTimeUs, - &WriteBlockLongestTimeUs, - &ReadBlockTimeUs, - &ReadBlockLongestTimeUs, - &TotalChunkCount, - &DeletedCount, - &MovedCount, - &DeletedSize, - &OldTotalSize] { + const auto _ = MakeGuard([&] { ZEN_INFO( "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " "#{} " @@ -1476,7 +1466,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); @@ -1530,7 +1520,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); @@ -1554,7 +1544,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); @@ -1638,9 +1628,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) KeepChunkIndexes, m_PayloadAlignment, false, - [this, &DeletedChunks, &ChunkIndexToChunkHash, &Index, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( - const BlockStore::MovedChunksArray& MovedChunks, - const BlockStore::ChunkIndexArray& RemovedChunks) { + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); for (const auto& Entry : MovedChunks) @@ -1668,7 +1656,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); @@ -1686,7 +1674,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } } }, - [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); + [&]() { return GcCtx.CollectSmallObjects(); }); GcCtx.DeletedCas(DeletedChunks); } -- cgit v1.2.3 From ae8505ad0af6375289c83b6455796e0c91609dc9 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 16:53:21 +0200 Subject: Make sure we close all block files when dropping a cache bucket --- zenserver/cache/structuredcachestore.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 46f9d8fe6..a9e9b8f78 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1202,8 +1202,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& void ZenCacheDiskLayer::CacheBucket::Drop() { - // TODO: close all open files and manage locking - // TODO: add error handling + m_BlockStore.Close(); m_SlogFile.Close(); DeleteDirectories(m_BucketDir); } -- cgit v1.2.3 From 1e3da36ef01f6b823febf22645a314267353a223 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 May 2022 17:06:30 +0200 Subject: switched back memory_order for m_TotalSize to relaxed --- zenserver/cache/structuredcachestore.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index a9e9b8f78..ae3b401a5 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -567,7 +567,7 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue m_CacheMap.insert_or_assign(HashKey, BucketValue(Value.Value, GcClock::TickCount())); } - m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order_seq_cst); + m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// @@ -1088,7 +1088,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is for (const auto& Entry : m_Index) { const DiskLocation& Location = Entry.second.Location; - m_TotalSize.fetch_add(Location.Size(), std::memory_order_seq_cst); + m_TotalSize.fetch_add(Location.Size(), std::memory_order::relaxed); if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; @@ -1554,10 +1554,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); m_Index.insert({Key, {Loc, GcClock::TickCount()}}); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order_seq_cst); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); continue; } - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order_seq_cst); + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); DeletedSize += Entry.Location.Size(); DeletedCount++; } @@ -1815,7 +1815,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order_seq_cst); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } void @@ -1836,7 +1836,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; m_SlogFile.Append(DiskIndexEntry); - m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order_seq_cst); + m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed); RwLock::ExclusiveLockScope __(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { -- cgit v1.2.3 From 78e582a60763c0d9499106be0cdfe6a794e26e42 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 3 May 2022 22:14:02 +0200 Subject: macos compilation fix --- zenserver/cache/structuredcachestore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index ae3b401a5..3ba75cd9c 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1025,7 +1025,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) OldLocation.Size(), 0, OldLocation.Flags() | LegacyDiskLocation::kTombStone); - LegacyLogEntries.push_back(LegacyDiskIndexEntry(ChunkHash, NewLocation)); + LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation}); } LegacyCasLog.Append(LegacyLogEntries); LegacyCasLog.Flush(); -- cgit v1.2.3 From a19eee841d7ce0c9c868dced40a6380f55cdb9bd Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 3 May 2022 23:04:45 +0200 Subject: handle that more than one block can be written to in parallel --- zenserver/cache/structuredcachestore.cpp | 39 ++++++++++++++++---------------- 1 file changed, 20 insertions(+), 19 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 3ba75cd9c..2869191fd 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1832,25 +1832,26 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const EntryFlags |= DiskLocation::kCompressed; } - BlockStoreLocation BlockStoreLocation = m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment); - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; - m_SlogFile.Append(DiskIndexEntry); - m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed); - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - IndexEntry& Entry = It.value(); - Entry.Location = Location; - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - } - else - { - m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); - } + m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](BlockStoreLocation BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; + m_SlogFile.Append(DiskIndexEntry); + m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed); + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + IndexEntry& Entry = It.value(); + Entry.Location = Location; + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + } + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } + }); } ////////////////////////////////////////////////////////////////////////// -- cgit v1.2.3 From ef12415d287c9307c0c4774aeacff6c91966f693 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 4 May 2022 19:45:57 +0200 Subject: cleanup --- zenserver/cache/structuredcachestore.cpp | 256 +++++++++++++++++++++++++++---- 1 file changed, 225 insertions(+), 31 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 2869191fd..075b7d408 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -232,7 +232,7 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } -ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) +ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc) , GcContributor(Gc) , m_RootDir(RootDir) @@ -248,12 +248,12 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) #endif } -ZenCacheStore::~ZenCacheStore() +ZenCacheNamespace::~ZenCacheNamespace() { } bool -ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Get"); @@ -291,7 +291,7 @@ ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheVal } void -ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { ZEN_TRACE_CPU("Z$::Put"); @@ -327,7 +327,7 @@ ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCa } bool -ZenCacheStore::DropBucket(std::string_view Bucket) +ZenCacheNamespace::DropBucket(std::string_view Bucket) { ZEN_INFO("dropping bucket '{}'", Bucket); @@ -343,13 +343,13 @@ ZenCacheStore::DropBucket(std::string_view Bucket) } void -ZenCacheStore::Flush() +ZenCacheNamespace::Flush() { m_DiskLayer.Flush(); } void -ZenCacheStore::Scrub(ScrubContext& Ctx) +ZenCacheNamespace::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { @@ -363,7 +363,7 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) } void -ZenCacheStore::GatherReferences(GcContext& GcCtx) +ZenCacheNamespace::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; const auto Guard = @@ -377,14 +377,14 @@ ZenCacheStore::GatherReferences(GcContext& GcCtx) } void -ZenCacheStore::CollectGarbage(GcContext& GcCtx) +ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) { m_MemLayer.Reset(); m_DiskLayer.CollectGarbage(GcCtx); } GcStorageSize -ZenCacheStore::StorageSize() const +ZenCacheNamespace::StorageSize() const { return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; } @@ -2098,6 +2098,200 @@ ZenCacheDiskLayer::TotalSize() const return TotalSize; } +//////////////////////////// ZenCacheStore + +const char* ZenCacheNamespaceDirPrefix = "ns_"; + +namespace { + + std::vector FindExistingFolders(const std::filesystem::path& RootPath) + { + FileSystemTraversal Traversal; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override {} + + virtual bool VisitDirectory(const std::filesystem::path&, const path_view& DirectoryName) override + { + std::string DirName8 = WideToUtf8(DirectoryName); + Dirs.push_back(DirName8); + return false; + } + + std::vector Dirs; + } Visit; + + Traversal.TraverseFileSystem(RootPath, Visit); + return Visit.Dirs; + } + +} // namespace + +ZenCacheStore::ZenCacheStore(std::filesystem::path BasePath, CasGc& Gc) : GcStorage(Gc), GcContributor(Gc) +{ + CreateDirectories(BasePath); + std::vector ExistingFolders = FindExistingFolders(BasePath); + + std::vector LegacyBuckets; + std::vector Namespaces; + for (const std::string& DirName : ExistingFolders) + { + if (DirName.starts_with(ZenCacheNamespaceDirPrefix)) + { + Namespaces.push_back(DirName.substr(3)); + continue; + } + LegacyBuckets.push_back(DirName); + } + + ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size()); + + if (std::find(Namespaces.begin(), Namespaces.end(), "") == Namespaces.end()) + { + ZEN_INFO("Moving #{} legacy buckets to anonymous namespace", LegacyBuckets.size()); + std::filesystem::path DefaultNamespaceFolder = BasePath / ZenCacheNamespaceDirPrefix; + CreateDirectories(DefaultNamespaceFolder); + + // Move any non-namespace folders into the default namespace folder + for (const std::string& DirName : LegacyBuckets) + { + std::filesystem::path LegacyFolder = BasePath / DirName; + std::filesystem::path NewPath = DefaultNamespaceFolder / DirName; + std::filesystem::rename(LegacyFolder, NewPath); + } + Namespaces.push_back(""); + } + + for (const std::string& NamespaceName : Namespaces) + { + Ref Store = new ZenCacheNamespace(Gc, BasePath / (ZenCacheNamespaceDirPrefix + NamespaceName)); + m_Namespaces[NamespaceName] = Store; + } +} + +ZenCacheStore::~ZenCacheStore() +{ + m_Namespaces.clear(); +} + +bool +ZenCacheStore::Get(const std::string& Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + Ref Store = GetStore(Namespace); + if (!Store) + { + return false; + } + return Store->Get(Bucket, HashKey, OutValue); +} + +void +ZenCacheStore::Put(const std::string& Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) +{ + Ref Store = GetStore(Namespace); + if (!Store) + { + return; + } + Store->Put(Bucket, HashKey, Value); +} + +bool +ZenCacheStore::DropBucket(const std::string& Namespace, std::string_view Bucket) +{ + Ref Store = GetStore(Namespace); + if (!Store) + { + return false; + } + return Store->DropBucket(Bucket); +} + +void +ZenCacheStore::Flush() +{ + std::vector> Stores; + RwLock::SharedLockScope _(m_NamespacesLock); + Stores.reserve(m_Namespaces.size()); + for (const auto& Entry : m_Namespaces) + { + Stores.push_back(Entry.second); + } + _.ReleaseNow(); + for (const Ref& Store : Stores) + { + Store->Flush(); + } +} + +void +ZenCacheStore::Scrub(ScrubContext& Ctx) +{ + std::vector> Stores = GetAllStores(); + for (const Ref& Store : Stores) + { + Store->Scrub(Ctx); + } +} + +Ref +ZenCacheStore::GetStore(const std::string& Namespace) +{ + RwLock::SharedLockScope _(m_NamespacesLock); + if (auto It = m_Namespaces.find(Namespace); It != m_Namespaces.end()) + { + return It->second; + } + return nullptr; +} + +std::vector> +ZenCacheStore::GetAllStores() const +{ + std::vector> Stores; + RwLock::SharedLockScope _(m_NamespacesLock); + Stores.reserve(m_Namespaces.size()); + for (const auto& Entry : m_Namespaces) + { + Stores.push_back(Entry.second); + } + return Stores; +} + +void +ZenCacheStore::GatherReferences(GcContext& GcCtx) +{ + std::vector> Stores = GetAllStores(); + for (const Ref& Store : Stores) + { + Store->GatherReferences(GcCtx); + } +} + +void +ZenCacheStore::CollectGarbage(GcContext& GcCtx) +{ + std::vector> Stores = GetAllStores(); + for (const Ref& Store : Stores) + { + Store->CollectGarbage(GcCtx); + } +} + +GcStorageSize +ZenCacheStore::StorageSize() const +{ + std::vector> Stores = GetAllStores(); + GcStorageSize Size; + for (const Ref& Store : Stores) + { + GcStorageSize StoreSize = Store->StorageSize(); + Size.MemorySize += StoreSize.MemorySize; + Size.DiskSize += StoreSize.DiskSize; + } + return Size; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS @@ -2136,7 +2330,7 @@ TEST_CASE("z$.store") CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const int kIterationCount = 100; @@ -2189,8 +2383,8 @@ TEST_CASE("z$.size") GcStorageSize CacheSize; { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); @@ -2209,8 +2403,8 @@ TEST_CASE("z$.size") } { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); @@ -2232,8 +2426,8 @@ TEST_CASE("z$.size") GcStorageSize CacheSize; { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); @@ -2252,8 +2446,8 @@ TEST_CASE("z$.size") } { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); @@ -2290,9 +2484,9 @@ TEST_CASE("z$.gc") }; { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); - const auto Bucket = "teardrinker"sv; + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "teardrinker"sv; // Create a cache record const IoHash Key = CreateKey(42); @@ -2328,7 +2522,7 @@ TEST_CASE("z$.gc") // Expect timestamps to be serialized { CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); std::vector Keep; // Collect garbage with 1 hour max cache duration @@ -2349,7 +2543,7 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "fortysixandtwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); @@ -2397,7 +2591,7 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "rightintwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); @@ -2490,7 +2684,7 @@ TEST_CASE("z$.legacyconversion") const std::string Bucket = "rightintwo"; { CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path()); + ZenCacheNamespace Zcs(Gc, TempDir.Path()); const GcClock::TimePoint CurrentTime = GcClock::Now(); for (size_t i = 0; i < ChunkCount; i++) @@ -2578,8 +2772,8 @@ TEST_CASE("z$.legacyconversion") std::filesystem::remove(IndexPath); { - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path()); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path()); for (size_t i = 0; i < ChunkCount; i += 2) { @@ -2639,9 +2833,9 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) CreateDirectories(TempDir.Path()); - WorkerThreadPool ThreadPool(4); - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path()); + WorkerThreadPool ThreadPool(4); + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path()); { std::atomic WorkCompleted = 0; -- cgit v1.2.3 From 861a92d1ee6c54eeb9035190501baf8ea888591f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 5 May 2022 09:55:09 +0200 Subject: cleanup and review feedback --- zenserver/cache/structuredcachestore.cpp | 33 +++++++++++++------------------- 1 file changed, 13 insertions(+), 20 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 075b7d408..7db18a7bb 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2100,7 +2100,7 @@ ZenCacheDiskLayer::TotalSize() const //////////////////////////// ZenCacheStore -const char* ZenCacheNamespaceDirPrefix = "ns_"; +static constexpr std::string_view ZenCacheNamespaceDirPrefix = "ns_"; namespace { @@ -2146,10 +2146,10 @@ ZenCacheStore::ZenCacheStore(std::filesystem::path BasePath, CasGc& Gc) : GcStor ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size()); - if (std::find(Namespaces.begin(), Namespaces.end(), "") == Namespaces.end()) + if (std::find(Namespaces.begin(), Namespaces.end(), DefaultNamespace) == Namespaces.end()) { ZEN_INFO("Moving #{} legacy buckets to anonymous namespace", LegacyBuckets.size()); - std::filesystem::path DefaultNamespaceFolder = BasePath / ZenCacheNamespaceDirPrefix; + std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", ZenCacheNamespaceDirPrefix, DefaultNamespace); CreateDirectories(DefaultNamespaceFolder); // Move any non-namespace folders into the default namespace folder @@ -2159,12 +2159,12 @@ ZenCacheStore::ZenCacheStore(std::filesystem::path BasePath, CasGc& Gc) : GcStor std::filesystem::path NewPath = DefaultNamespaceFolder / DirName; std::filesystem::rename(LegacyFolder, NewPath); } - Namespaces.push_back(""); + Namespaces.push_back(std::string(DefaultNamespace)); } for (const std::string& NamespaceName : Namespaces) { - Ref Store = new ZenCacheNamespace(Gc, BasePath / (ZenCacheNamespaceDirPrefix + NamespaceName)); + Ref Store = new ZenCacheNamespace(Gc, BasePath / fmt::format("{}{}", ZenCacheNamespaceDirPrefix, NamespaceName)); m_Namespaces[NamespaceName] = Store; } } @@ -2175,7 +2175,7 @@ ZenCacheStore::~ZenCacheStore() } bool -ZenCacheStore::Get(const std::string& Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) { Ref Store = GetStore(Namespace); if (!Store) @@ -2186,9 +2186,9 @@ ZenCacheStore::Get(const std::string& Namespace, std::string_view Bucket, const } void -ZenCacheStore::Put(const std::string& Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) { - Ref Store = GetStore(Namespace); + Ref Store = GetStore(std::string(Namespace)); if (!Store) { return; @@ -2197,9 +2197,9 @@ ZenCacheStore::Put(const std::string& Namespace, std::string_view Bucket, const } bool -ZenCacheStore::DropBucket(const std::string& Namespace, std::string_view Bucket) +ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) { - Ref Store = GetStore(Namespace); + Ref Store = GetStore(std::string(Namespace)); if (!Store) { return false; @@ -2210,14 +2210,7 @@ ZenCacheStore::DropBucket(const std::string& Namespace, std::string_view Bucket) void ZenCacheStore::Flush() { - std::vector> Stores; - RwLock::SharedLockScope _(m_NamespacesLock); - Stores.reserve(m_Namespaces.size()); - for (const auto& Entry : m_Namespaces) - { - Stores.push_back(Entry.second); - } - _.ReleaseNow(); + std::vector> Stores = GetAllStores(); for (const Ref& Store : Stores) { Store->Flush(); @@ -2235,10 +2228,10 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) } Ref -ZenCacheStore::GetStore(const std::string& Namespace) +ZenCacheStore::GetStore(std::string_view Namespace) { RwLock::SharedLockScope _(m_NamespacesLock); - if (auto It = m_Namespaces.find(Namespace); It != m_Namespaces.end()) + if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) { return It->second; } -- cgit v1.2.3 From 7b842505d25fcd8f0c52656c608c1a66f45ccf96 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 5 May 2022 10:16:39 +0200 Subject: mac/linux build fix --- zenserver/cache/structuredcachestore.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 7db18a7bb..23ad550c9 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2113,8 +2113,12 @@ namespace { virtual bool VisitDirectory(const std::filesystem::path&, const path_view& DirectoryName) override { - std::string DirName8 = WideToUtf8(DirectoryName); - Dirs.push_back(DirName8); +#if ZEN_PLATFORM_WINDOWS + std::string DirectoryName8 = WideToUtf8(DirectoryName); +#else + std::string DirectoryName8 = std::string(DirectoryName); +#endif + Dirs.push_back(DirectoryName8); return false; } -- cgit v1.2.3 From d484acb3d32662c9e1faf9a99efad543f607732a Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 5 May 2022 10:49:35 +0200 Subject: revert back constructor order for ZenCacheStore --- zenserver/cache/structuredcachestore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 23ad550c9..a734e9eb1 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2131,7 +2131,7 @@ namespace { } // namespace -ZenCacheStore::ZenCacheStore(std::filesystem::path BasePath, CasGc& Gc) : GcStorage(Gc), GcContributor(Gc) +ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc) { CreateDirectories(BasePath); std::vector ExistingFolders = FindExistingFolders(BasePath); -- cgit v1.2.3 From e4b96fade542151fca17b5ac61e3eaad263ce92c Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 6 May 2022 11:53:11 +0200 Subject: Added GetDirectoryContent utility --- zenserver/cache/structuredcachestore.cpp | 78 ++++++-------------------------- 1 file changed, 13 insertions(+), 65 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index a734e9eb1..da48253e2 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1956,49 +1956,23 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z void ZenCacheDiskLayer::DiscoverBuckets() { - FileSystemTraversal Traversal; - struct Visitor : public FileSystemTraversal::TreeVisitor - { - virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& File, - [[maybe_unused]] uint64_t FileSize) override - { - } - - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override - { - Dirs.push_back((decltype(Dirs)::value_type)(DirectoryName)); - return false; - } - - std::vector Dirs; - } Visit; - - Traversal.TraverseFileSystem(m_RootDir, Visit); + DirectoryContent DirContent; + GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); // Initialize buckets RwLock::ExclusiveLockScope _(m_Lock); - for (const auto& BucketName : Visit.Dirs) + for (const std::filesystem::path& BucketPath : DirContent.Directories) { + std::string BucketName = PathToUtf8(BucketPath.stem()); // New bucket needs to be created - -#if ZEN_PLATFORM_WINDOWS - std::string BucketName8 = WideToUtf8(BucketName); -#else - const auto& BucketName8 = BucketName; -#endif - - if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end()) + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { } else { - auto InsertResult = m_Buckets.try_emplace(BucketName8, BucketName8); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName8; + auto InsertResult = m_Buckets.try_emplace(BucketName, BucketName); CacheBucket& Bucket = InsertResult.first->second; @@ -2006,11 +1980,11 @@ ZenCacheDiskLayer::DiscoverBuckets() if (Bucket.IsOk()) { - ZEN_INFO("Discovered bucket '{}'", BucketName8); + ZEN_INFO("Discovered bucket '{}'", BucketName); } else { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir); + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); m_Buckets.erase(InsertResult.first); } @@ -2102,44 +2076,18 @@ ZenCacheDiskLayer::TotalSize() const static constexpr std::string_view ZenCacheNamespaceDirPrefix = "ns_"; -namespace { - - std::vector FindExistingFolders(const std::filesystem::path& RootPath) - { - FileSystemTraversal Traversal; - struct Visitor : public FileSystemTraversal::TreeVisitor - { - virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override {} - - virtual bool VisitDirectory(const std::filesystem::path&, const path_view& DirectoryName) override - { -#if ZEN_PLATFORM_WINDOWS - std::string DirectoryName8 = WideToUtf8(DirectoryName); -#else - std::string DirectoryName8 = std::string(DirectoryName); -#endif - Dirs.push_back(DirectoryName8); - return false; - } - - std::vector Dirs; - } Visit; - - Traversal.TraverseFileSystem(RootPath, Visit); - return Visit.Dirs; - } - -} // namespace - ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc) { CreateDirectories(BasePath); - std::vector ExistingFolders = FindExistingFolders(BasePath); + + DirectoryContent DirContent; + GetDirectoryContent(BasePath, DirectoryContent::IncludeDirsFlag, DirContent); std::vector LegacyBuckets; std::vector Namespaces; - for (const std::string& DirName : ExistingFolders) + for (const std::filesystem::path& DirPath : DirContent.Directories) { + std::string DirName = PathToUtf8(DirPath.stem()); if (DirName.starts_with(ZenCacheNamespaceDirPrefix)) { Namespaces.push_back(DirName.substr(3)); -- cgit v1.2.3 From 6db10b5a491297d45c14efae453c420f0d7fa58c Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 6 May 2022 12:12:09 +0200 Subject: review feedback and cleanup --- zenserver/cache/structuredcachestore.cpp | 81 ++++++++++++++------------------ 1 file changed, 36 insertions(+), 45 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index da48253e2..3ac319961 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2109,7 +2109,12 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor { std::filesystem::path LegacyFolder = BasePath / DirName; std::filesystem::path NewPath = DefaultNamespaceFolder / DirName; - std::filesystem::rename(LegacyFolder, NewPath); + std::error_code Ec; + std::filesystem::rename(LegacyFolder, NewPath, Ec); + if (Ec) + { + ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message()); + } } Namespaces.push_back(std::string(DefaultNamespace)); } @@ -2129,54 +2134,45 @@ ZenCacheStore::~ZenCacheStore() bool ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) { - Ref Store = GetStore(Namespace); - if (!Store) + if (Ref Store = GetStore(Namespace); Store) { - return false; + return Store->Get(Bucket, HashKey, OutValue); } - return Store->Get(Bucket, HashKey, OutValue); + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); + return false; } void ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) { - Ref Store = GetStore(std::string(Namespace)); - if (!Store) + if (Ref Store = GetStore(Namespace); Store) { - return; + return Store->Put(Bucket, HashKey, Value); } - Store->Put(Bucket, HashKey, Value); + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); } bool ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) { - Ref Store = GetStore(std::string(Namespace)); - if (!Store) + if (Ref Store = GetStore(Namespace); Store) { - return false; + return Store->DropBucket(Bucket); } - return Store->DropBucket(Bucket); + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}'", Namespace, Bucket); + return false; } void ZenCacheStore::Flush() { - std::vector> Stores = GetAllStores(); - for (const Ref& Store : Stores) - { - Store->Flush(); - } + IterateStores([&](const Ref& Store) { Store->Flush(); }); } void ZenCacheStore::Scrub(ScrubContext& Ctx) { - std::vector> Stores = GetAllStores(); - for (const Ref& Store : Stores) - { - Store->Scrub(Ctx); - } + IterateStores([&](const Ref& Store) { Store->Scrub(Ctx); }); } Ref @@ -2190,50 +2186,45 @@ ZenCacheStore::GetStore(std::string_view Namespace) return nullptr; } -std::vector> -ZenCacheStore::GetAllStores() const +void +ZenCacheStore::IterateStores(const std::function& Store)>& Callback) const { std::vector> Stores; - RwLock::SharedLockScope _(m_NamespacesLock); - Stores.reserve(m_Namespaces.size()); - for (const auto& Entry : m_Namespaces) { - Stores.push_back(Entry.second); + RwLock::SharedLockScope _(m_NamespacesLock); + Stores.reserve(m_Namespaces.size()); + for (const auto& Entry : m_Namespaces) + { + Stores.push_back(Entry.second); + } + } + for (const Ref& Store : Stores) + { + Callback(Store); } - return Stores; } void ZenCacheStore::GatherReferences(GcContext& GcCtx) { - std::vector> Stores = GetAllStores(); - for (const Ref& Store : Stores) - { - Store->GatherReferences(GcCtx); - } + IterateStores([&](const Ref& Store) { Store->GatherReferences(GcCtx); }); } void ZenCacheStore::CollectGarbage(GcContext& GcCtx) { - std::vector> Stores = GetAllStores(); - for (const Ref& Store : Stores) - { - Store->CollectGarbage(GcCtx); - } + IterateStores([&](const Ref& Store) { Store->CollectGarbage(GcCtx); }); } GcStorageSize ZenCacheStore::StorageSize() const { - std::vector> Stores = GetAllStores(); - GcStorageSize Size; - for (const Ref& Store : Stores) - { + GcStorageSize Size; + IterateStores([&](const Ref& Store) { GcStorageSize StoreSize = Store->StorageSize(); Size.MemorySize += StoreSize.MemorySize; Size.DiskSize += StoreSize.DiskSize; - } + }); return Size; } -- cgit v1.2.3 From 76fd97b9d864ab60d06859befdd4a3a3bf4abd97 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 6 May 2022 12:55:04 +0200 Subject: Fix standalone file lock in CacheBucket Grab sharding lock when deleting files during GC Don't hold sharding lock when sleeping in back-off due to file contention Remove unneeded renaming logic when writing standalone cache values --- zenserver/cache/structuredcachestore.cpp | 170 ++++++++++++++++++------------- 1 file changed, 97 insertions(+), 73 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 2869191fd..163a3f2f2 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1515,6 +1515,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) Path.Reset(); BuildPath(Path, Key); + fs::path FilePath = Path.ToPath(); { RwLock::SharedLockScope __(m_IndexLock); @@ -1530,8 +1531,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } - ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); - fs::remove(Path.c_str(), Ec); + __.ReleaseNow(); + + RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(FilePath, Ec); + } } if (Ec) @@ -1722,100 +1729,117 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); + std::filesystem::path FsPath{DataFilePath.ToPath()}; - TemporaryFile DataFile; - - std::error_code Ec; - DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); + auto UpdateIndex = [&]() { + uint8_t EntryFlags = DiskLocation::kStandaloneFile; - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir)); - } + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } - DataFile.WriteAll(Value.Value, Ec); + RwLock::ExclusiveLockScope _(m_IndexLock); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to file", NiceBytes(Value.Value.Size()))); - } + DiskLocation Loc(Value.Value.Size(), EntryFlags); + IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); - // Move file into place (atomically) + if (auto It = m_Index.find(HashKey); It == m_Index.end()) + { + // Previously unknown object + m_Index.insert({HashKey, Entry}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + It.value() = Entry; + } - std::filesystem::path FsPath{DataFilePath.ToPath()}; + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + }; - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + std::error_code Ec; + BasicFile DataFile; - if (Ec) + // Happy path - directory structure exists and nobody is busy reading the file { - int RetryCount = 3; - - do + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + DataFile.Open(FsPath, BasicFile::Mode::kTruncate, Ec); + if (!Ec) { - std::filesystem::path ParentPath = FsPath.parent_path(); - CreateDirectories(ParentPath); - - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - - if (!Ec) - { - break; - } - - std::error_code InnerEc; - const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc); - - if (!InnerEc && ExistingFileSize == Value.Value.Size()) + DataFile.WriteAll(Value.Value, Ec); + if (Ec) { - // Concurrent write of same value? - return; + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to file '{}' in '{}'", + NiceBytes(Value.Value.Size()), + FsPath, + m_BucketDir)); + } } + ValueLock.ReleaseNow(); + UpdateIndex(); + return; + } + } - // Semi arbitrary back-off - zen::Sleep(1000 * RetryCount); - } while (RetryCount--); - + std::filesystem::path ParentPath = FsPath.parent_path(); + if (!std::filesystem::is_directory(ParentPath)) + { + Ec.clear(); + std::filesystem::create_directories(ParentPath, Ec); if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to finalize file '{}'", DataFilePath.ToUtf8())); + throw std::system_error( + Ec, + fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); } } - // Update index - - uint8_t EntryFlags = DiskLocation::kStandaloneFile; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + // We retry to open the file since it can be held open for read + // This happens if the server gets a Get request for the file or + // if we are busy sending the file upstream + int RetryCount = 3; + do { - EntryFlags |= DiskLocation::kCompressed; - } - - RwLock::ExclusiveLockScope _(m_IndexLock); - - DiskLocation Loc(Value.Value.Size(), EntryFlags); - IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); + Ec.clear(); + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + DataFile.Open(FsPath, BasicFile::Mode::kTruncate, Ec); + if (!Ec) + { + DataFile.WriteAll(Value.Value, Ec); + if (Ec) + { + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to file '{}' in '{}'", + NiceBytes(Value.Value.Size()), + FsPath, + m_BucketDir)); + } + } + ValueLock.ReleaseNow(); + UpdateIndex(); + return; + } + ZEN_INFO("Failed writing opening file '{}' for writing, pausing and retrying, reason '{}'", FsPath.string(), Ec.message()); + ValueLock.ReleaseNow(); - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - m_Index.insert({HashKey, Entry}); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - It.value() = Entry; - } + // Semi arbitrary back-off + zen::Sleep(200 * (4 - RetryCount)); // Sleep at most for a total of 2 seconds + } while (RetryCount--); - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } void -- cgit v1.2.3 From 1e279eb7700e7bfb35282cbc8acdaec3cb355e23 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 6 May 2022 13:23:52 +0200 Subject: clean up file on failed write --- zenserver/cache/structuredcachestore.cpp | 73 ++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 32 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 163a3f2f2..77307bc2d 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1765,27 +1765,41 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); }; - std::error_code Ec; - BasicFile DataFile; + auto WritePayload = [&](BasicFile& File, const IoBuffer& Payload) { + std::error_code Ec; + File.WriteAll(Payload, Ec); + if (Ec) + { + File.Close(); + std::error_code RemoveEc; + std::filesystem::remove(FsPath, RemoveEc); + if (RemoveEc) + { + ZEN_WARN("Failed cleaning up file '{}' after failed write for put in '{}', reason '{}'", + FsPath.string(), + m_BucketDir, + RemoveEc.message()); + } + + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to file '{}' for put in '{}'", + NiceBytes(Payload.Size()), + FsPath, + m_BucketDir)); + } + File.Close(); + }; // Happy path - directory structure exists and nobody is busy reading the file { + std::error_code Ec; + BasicFile DataFile; + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); DataFile.Open(FsPath, BasicFile::Mode::kTruncate, Ec); if (!Ec) { - DataFile.WriteAll(Value.Value, Ec); - if (Ec) - { - if (Ec) - { - throw std::system_error(Ec, - fmt::format("Failed to write payload ({} bytes) to file '{}' in '{}'", - NiceBytes(Value.Value.Size()), - FsPath, - m_BucketDir)); - } - } + WritePayload(DataFile, Value.Value); ValueLock.ReleaseNow(); UpdateIndex(); return; @@ -1795,7 +1809,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c std::filesystem::path ParentPath = FsPath.parent_path(); if (!std::filesystem::is_directory(ParentPath)) { - Ec.clear(); + std::error_code Ec; std::filesystem::create_directories(ParentPath, Ec); if (Ec) { @@ -1805,41 +1819,36 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } } - // We retry to open the file since it can be held open for read - // This happens if the server gets a Get request for the file or + // We retry to open the file since it can be held open for read. + // This happens if the server processes a Get request for the file or // if we are busy sending the file upstream - int RetryCount = 3; + int RetryCount = 3; + std::error_code Ec; do { + BasicFile DataFile; Ec.clear(); + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); DataFile.Open(FsPath, BasicFile::Mode::kTruncate, Ec); if (!Ec) { - DataFile.WriteAll(Value.Value, Ec); - if (Ec) - { - if (Ec) - { - throw std::system_error(Ec, - fmt::format("Failed to write payload ({} bytes) to file '{}' in '{}'", - NiceBytes(Value.Value.Size()), - FsPath, - m_BucketDir)); - } - } + WritePayload(DataFile, Value.Value); ValueLock.ReleaseNow(); UpdateIndex(); return; } - ZEN_INFO("Failed writing opening file '{}' for writing, pausing and retrying, reason '{}'", FsPath.string(), Ec.message()); + ZEN_INFO("Failed writing opening file '{}' for writing for put in '{}', pausing and retrying, reason '{}'", + FsPath.string(), + m_BucketDir, + Ec.message()); ValueLock.ReleaseNow(); // Semi arbitrary back-off zen::Sleep(200 * (4 - RetryCount)); // Sleep at most for a total of 2 seconds } while (RetryCount--); - throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); + throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } void -- cgit v1.2.3 From 394e31f10d0ec32bb6dbe7ea9b7f3a1dc4edeec6 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 6 May 2022 17:40:31 +0200 Subject: restore write using rename in PutStandaloneCacheValue --- zenserver/cache/structuredcachestore.cpp | 150 +++++++++++++------------------ 1 file changed, 61 insertions(+), 89 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 77307bc2d..dee4c55f0 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1729,124 +1729,96 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { + TemporaryFile DataFile; + + std::error_code Ec; + DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir)); + } + + DataFile.WriteAll(Value.Value, Ec); + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to temporary file for put in '{}'", + NiceBytes(Value.Value.Size()), + m_BucketDir)); + } + ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; - auto UpdateIndex = [&]() { - uint8_t EntryFlags = DiskLocation::kStandaloneFile; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + // We retry to open the file since it can be held open for read. + // This happens if the server processes a Get request for the file or + // if we are busy sending the file upstream + int RetryCount = 3; + do + { + Ec.clear(); { - EntryFlags |= DiskLocation::kCompressed; + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); } - RwLock::ExclusiveLockScope _(m_IndexLock); - - DiskLocation Loc(Value.Value.Size(), EntryFlags); - IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); - - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - m_Index.insert({HashKey, Entry}); - } - else + if (!Ec) { - // TODO: should check if write is idempotent and bail out if it is? - It.value() = Entry; - } + uint8_t EntryFlags = DiskLocation::kStandaloneFile; - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); - }; - - auto WritePayload = [&](BasicFile& File, const IoBuffer& Payload) { - std::error_code Ec; - File.WriteAll(Payload, Ec); - if (Ec) - { - File.Close(); - std::error_code RemoveEc; - std::filesystem::remove(FsPath, RemoveEc); - if (RemoveEc) + if (Value.Value.GetContentType() == ZenContentType::kCbObject) { - ZEN_WARN("Failed cleaning up file '{}' after failed write for put in '{}', reason '{}'", - FsPath.string(), - m_BucketDir, - RemoveEc.message()); + EntryFlags |= DiskLocation::kStructured; } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + DiskLocation Loc(Value.Value.Size(), EntryFlags); + IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); - throw std::system_error(Ec, - fmt::format("Failed to write payload ({} bytes) to file '{}' for put in '{}'", - NiceBytes(Payload.Size()), - FsPath, - m_BucketDir)); - } - File.Close(); - }; - - // Happy path - directory structure exists and nobody is busy reading the file - { - std::error_code Ec; - BasicFile DataFile; + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It == m_Index.end()) + { + // Previously unknown object + m_Index.insert({HashKey, Entry}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + It.value() = Entry; + } - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - DataFile.Open(FsPath, BasicFile::Mode::kTruncate, Ec); - if (!Ec) - { - WritePayload(DataFile, Value.Value); - ValueLock.ReleaseNow(); - UpdateIndex(); + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); return; } - } - std::filesystem::path ParentPath = FsPath.parent_path(); - if (!std::filesystem::is_directory(ParentPath)) - { - std::error_code Ec; - std::filesystem::create_directories(ParentPath, Ec); - if (Ec) + std::filesystem::path ParentPath = FsPath.parent_path(); + if (!std::filesystem::is_directory(ParentPath)) { + Ec.clear(); + std::filesystem::create_directories(ParentPath, Ec); + if (!Ec) + { + // Retry without sleep + continue; + } throw std::system_error( Ec, fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); } - } - // We retry to open the file since it can be held open for read. - // This happens if the server processes a Get request for the file or - // if we are busy sending the file upstream - int RetryCount = 3; - std::error_code Ec; - do - { - BasicFile DataFile; - Ec.clear(); - - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - DataFile.Open(FsPath, BasicFile::Mode::kTruncate, Ec); - if (!Ec) - { - WritePayload(DataFile, Value.Value); - ValueLock.ReleaseNow(); - UpdateIndex(); - return; - } ZEN_INFO("Failed writing opening file '{}' for writing for put in '{}', pausing and retrying, reason '{}'", FsPath.string(), m_BucketDir, Ec.message()); - ValueLock.ReleaseNow(); // Semi arbitrary back-off zen::Sleep(200 * (4 - RetryCount)); // Sleep at most for a total of 2 seconds - } while (RetryCount--); + RetryCount--; + } while (RetryCount > 0); throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } -- cgit v1.2.3 From 2f6461e3ed7851bc5592b6bc9efdfb0d973fe284 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 6 May 2022 23:32:35 +0200 Subject: remove use of Ref<> in ZenCacheStore naming cleanup --- zenserver/cache/structuredcachestore.cpp | 40 ++++++++++++++++---------------- 1 file changed, 20 insertions(+), 20 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 3ac319961..1d43e9591 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2121,8 +2121,8 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor for (const std::string& NamespaceName : Namespaces) { - Ref Store = new ZenCacheNamespace(Gc, BasePath / fmt::format("{}{}", ZenCacheNamespaceDirPrefix, NamespaceName)); - m_Namespaces[NamespaceName] = Store; + m_Namespaces[NamespaceName] = + std::make_unique(Gc, BasePath / fmt::format("{}{}", ZenCacheNamespaceDirPrefix, NamespaceName)); } } @@ -2134,7 +2134,7 @@ ZenCacheStore::~ZenCacheStore() bool ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) { - if (Ref Store = GetStore(Namespace); Store) + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { return Store->Get(Bucket, HashKey, OutValue); } @@ -2145,7 +2145,7 @@ ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const Io void ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) { - if (Ref Store = GetStore(Namespace); Store) + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { return Store->Put(Bucket, HashKey, Value); } @@ -2155,7 +2155,7 @@ ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const Io bool ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) { - if (Ref Store = GetStore(Namespace); Store) + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { return Store->DropBucket(Bucket); } @@ -2166,62 +2166,62 @@ ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) void ZenCacheStore::Flush() { - IterateStores([&](const Ref& Store) { Store->Flush(); }); + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); } void ZenCacheStore::Scrub(ScrubContext& Ctx) { - IterateStores([&](const Ref& Store) { Store->Scrub(Ctx); }); + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); } -Ref -ZenCacheStore::GetStore(std::string_view Namespace) +ZenCacheNamespace* +ZenCacheStore::GetNamespace(std::string_view Namespace) { RwLock::SharedLockScope _(m_NamespacesLock); if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) { - return It->second; + return It->second.get(); } return nullptr; } void -ZenCacheStore::IterateStores(const std::function& Store)>& Callback) const +ZenCacheStore::IterateNamespaces(const std::function& Callback) const { - std::vector> Stores; + std::vector > Namespaces; { RwLock::SharedLockScope _(m_NamespacesLock); - Stores.reserve(m_Namespaces.size()); + Namespaces.reserve(m_Namespaces.size()); for (const auto& Entry : m_Namespaces) { - Stores.push_back(Entry.second); + Namespaces.push_back({Entry.first, *Entry.second}); } } - for (const Ref& Store : Stores) + for (auto& Entry : Namespaces) { - Callback(Store); + Callback(Entry.first, Entry.second); } } void ZenCacheStore::GatherReferences(GcContext& GcCtx) { - IterateStores([&](const Ref& Store) { Store->GatherReferences(GcCtx); }); + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.GatherReferences(GcCtx); }); } void ZenCacheStore::CollectGarbage(GcContext& GcCtx) { - IterateStores([&](const Ref& Store) { Store->CollectGarbage(GcCtx); }); + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.CollectGarbage(GcCtx); }); } GcStorageSize ZenCacheStore::StorageSize() const { GcStorageSize Size; - IterateStores([&](const Ref& Store) { - GcStorageSize StoreSize = Store->StorageSize(); + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { + GcStorageSize StoreSize = Store.StorageSize(); Size.MemorySize += StoreSize.MemorySize; Size.DiskSize += StoreSize.DiskSize; }); -- cgit v1.2.3 From 308d60e0289b2adc5c0738fe25273176e780735f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Sun, 8 May 2022 00:11:11 +0200 Subject: Make sure blockstore owner and block store state does not get out of sync when fetching a chunk Move MarkAsDeleteOnClose() to IoBuffer(ExtendedCore) and set it on close, SetFileInformationByHandle sometimes fails if done in parallel with FileMapping --- zenserver/cache/structuredcachestore.cpp | 48 +++++++++++++++----------------- 1 file changed, 22 insertions(+), 26 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 2869191fd..a929284b9 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1126,13 +1126,11 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen { BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); - Ref ChunkBlock = m_BlockStore.GetChunkBlock(Location); - if (!ChunkBlock) + OutValue.Value = m_BlockStore.TryGetChunk(Location); + if (!OutValue.Value) { return false; } - - OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size); OutValue.Value.SetContentType(Loc.GetContentType()); return true; @@ -1166,22 +1164,21 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } RwLock::SharedLockScope _(m_IndexLock); - - if (auto It = m_Index.find(HashKey); It != m_Index.end()) + auto It = m_Index.find(HashKey); + if (It == m_Index.end()) + { + return false; + } + IndexEntry& Entry = It.value(); + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + DiskLocation Location = Entry.Location; + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { - IndexEntry& Entry = It.value(); - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - DiskLocation Location = Entry.Location; + // We don't need to hold the index lock when we read a standalone file _.ReleaseNow(); - - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - return GetStandaloneCacheValue(Location, HashKey, OutValue); - } - return GetInlineCacheValue(Location, OutValue); + return GetStandaloneCacheValue(Location, HashKey, OutValue); } - - return false; + return GetInlineCacheValue(Location, OutValue); } void @@ -1470,14 +1467,13 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); + if (m_Index.empty()) { - if (m_Index.empty()) - { - ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); - return; - } - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + return; } + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + SaveManifest(); Index = m_Index; @@ -1832,12 +1828,11 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const EntryFlags |= DiskLocation::kCompressed; } - m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](BlockStoreLocation BlockStoreLocation) { + m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; m_SlogFile.Append(DiskIndexEntry); - m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed); - RwLock::ExclusiveLockScope __(m_IndexLock); + RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { // TODO: should check if write is idempotent and bail out if it is? @@ -1852,6 +1847,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); } }); + m_TotalSize.fetch_add(Value.Value.Size(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// -- cgit v1.2.3 From e177d1005c73512112b2ff6ab6f34e5d327c127b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 9 May 2022 08:31:46 +0200 Subject: fix exception message/logging --- zenserver/cache/structuredcachestore.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index dee4c55f0..f69ffd82c 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1735,15 +1735,16 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir)); + throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, - fmt::format("Failed to write payload ({} bytes) to temporary file for put in '{}'", + fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", NiceBytes(Value.Value.Size()), + DataFile.GetPath().string(), m_BucketDir)); } @@ -1775,6 +1776,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c { EntryFlags |= DiskLocation::kCompressed; } + DiskLocation Loc(Value.Value.Size(), EntryFlags); IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); @@ -1810,7 +1812,8 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); } - ZEN_INFO("Failed writing opening file '{}' for writing for put in '{}', pausing and retrying, reason '{}'", + ZEN_INFO("Failed renaming temporary file '{}' to '{}' for put in '{}', pausing and retrying, reason '{}'", + DataFile.GetPath().string(), FsPath.string(), m_BucketDir, Ec.message()); -- cgit v1.2.3 From 5ef2b317ef1965121ab0090d86962d3eea4a357e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 9 May 2022 22:03:45 +0200 Subject: Make sure CacheBucket::PutStandaloneCacheValue cleans up the temp file if we fail to move the it into place --- zenserver/cache/structuredcachestore.cpp | 48 ++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 05c80c5bf..411717e61 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1819,6 +1819,14 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c RetryCount--; } while (RetryCount > 0); + // Once we have called MoveTemporaryIntoPlace we no longer will automatically clean up the temp file + // as the file handle has already been closed + std::filesystem::remove(DataFile.GetPath(), Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", DataFile.GetPath(), m_BucketDir, Ec.message()); + } + throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } @@ -2971,6 +2979,46 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } +# if ZEN_PLATFORM_WINDOWS +TEST_CASE("z$.blocked.disklayer.put") +{ + // On Windows platform we can't overwrite a standalone file that + // is open for read at the same time. + // Make sure the retry path runs and we get an exception + + ScopedTemporaryDirectory TempDir; + + GcStorageSize CacheSize; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + CasGc Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + + CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + size_t Key = Buffer.Size(); + IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); + + ZenCacheValue BufferGet; + CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); + + MemoryView ValueView = BufferGet.Value.GetView(); + CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer})); +} +# endif + #endif void -- cgit v1.2.3 From 5a872e2c699b439e3e5e95fe1c1882c8a0ca92dd Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 9 May 2022 22:32:17 +0200 Subject: Restore logic where we accept failed overwrite if resulting size is the same Correctly calculate the m_TotalSize difference when overwriting file --- zenserver/cache/structuredcachestore.cpp | 47 ++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 411717e61..b6fd44742 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1748,6 +1748,8 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; + uint64_t OldFileSize = 0; + // We retry to open the file since it can be held open for read. // This happens if the server processes a Get request for the file or // if we are busy sending the file upstream @@ -1757,7 +1759,27 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c Ec.clear(); { RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + + std::error_code ExistingEc; + OldFileSize = std::filesystem::file_size(FsPath, ExistingEc); + if (ExistingEc) + { + OldFileSize = 0; + } + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + + if (Ec && (!ExistingEc) && (OldFileSize == Value.Value.Size())) + { + ZEN_INFO( + "Failed to move temporary file '{}' to '{}'. Target file has same size, assuming concurrent write of same value, move " + "failed with reason '{}'", + DataFile.GetPath(), + FsPath.string(), + m_BucketDir, + Ec.message()); + return; + } } if (!Ec) @@ -1789,7 +1811,15 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + uint64_t NewFileSize = Loc.Size(); + if (OldFileSize <= NewFileSize) + { + m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed); + } + else + { + m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed); + } return; } @@ -3014,8 +3044,19 @@ TEST_CASE("z$.blocked.disklayer.put") ZenCacheValue BufferGet; CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); - MemoryView ValueView = BufferGet.Value.GetView(); - CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer})); + // Overwriting with a value of same size should go fine + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); + + CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1); + IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); + Buffer2.SetContentType(ZenContentType::kCbObject); + // Overwriting with different size should throw exception if file is held open + CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer2})); + + BufferGet = ZenCacheValue{}; + + // Read access has been removed, we should now be able to overwrite it + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); } # endif -- cgit v1.2.3 From 239e09c1df23e080c5d88cfb5d6af8eb63c232f9 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 9 May 2022 22:38:27 +0200 Subject: make test run on more platforms --- zenserver/cache/structuredcachestore.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index b6fd44742..a4cab881f 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -3009,13 +3009,8 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } -# if ZEN_PLATFORM_WINDOWS TEST_CASE("z$.blocked.disklayer.put") { - // On Windows platform we can't overwrite a standalone file that - // is open for read at the same time. - // Make sure the retry path runs and we get an exception - ScopedTemporaryDirectory TempDir; GcStorageSize CacheSize; @@ -3050,15 +3045,20 @@ TEST_CASE("z$.blocked.disklayer.put") CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1); IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); - // Overwriting with different size should throw exception if file is held open +# if ZEN_PLATFORM_WINDOWS + // On Windows platform, overwriting with different size while we have + // it open for read should throw exception if file is held open CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer2})); +# else + // Other platforms should handle overwrite just fine + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); +# endif BufferGet = ZenCacheValue{}; // Read access has been removed, we should now be able to overwrite it Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); } -# endif #endif -- cgit v1.2.3 From e67a43514bfba97fae4bc4ccf42ca312ba1d01bb Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 9 May 2022 23:31:29 +0200 Subject: happy path should be minimal work --- zenserver/cache/structuredcachestore.cpp | 46 ++++++++++++++++---------------- 1 file changed, 23 insertions(+), 23 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index a4cab881f..c3904d40a 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1725,6 +1725,8 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { + uint64_t NewFileSize = Value.Value.Size(); + TemporaryFile DataFile; std::error_code Ec; @@ -1739,7 +1741,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c { throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", - NiceBytes(Value.Value.Size()), + NiceBytes(NewFileSize), DataFile.GetPath().string(), m_BucketDir)); } @@ -1748,9 +1750,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; - uint64_t OldFileSize = 0; - - // We retry to open the file since it can be held open for read. + // We retry to move the file since it can be held open for read. // This happens if the server processes a Get request for the file or // if we are busy sending the file upstream int RetryCount = 3; @@ -1760,25 +1760,24 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c { RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - std::error_code ExistingEc; - OldFileSize = std::filesystem::file_size(FsPath, ExistingEc); - if (ExistingEc) - { - OldFileSize = 0; - } - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec && (!ExistingEc) && (OldFileSize == Value.Value.Size())) + if (Ec) { - ZEN_INFO( - "Failed to move temporary file '{}' to '{}'. Target file has same size, assuming concurrent write of same value, move " - "failed with reason '{}'", - DataFile.GetPath(), - FsPath.string(), - m_BucketDir, - Ec.message()); - return; + std::error_code ExistingEc; + uint64_t OldFileSize = std::filesystem::file_size(FsPath, ExistingEc); + if (!ExistingEc && (OldFileSize == NewFileSize)) + { + ZEN_INFO( + "Failed to move temporary file '{}' to '{}'. Target file has same size, assuming concurrent write of same value, " + "move " + "failed with reason '{}'", + DataFile.GetPath(), + FsPath.string(), + m_BucketDir, + Ec.message()); + return; + } } } @@ -1795,9 +1794,10 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c EntryFlags |= DiskLocation::kCompressed; } - DiskLocation Loc(Value.Value.Size(), EntryFlags); + DiskLocation Loc(NewFileSize, EntryFlags); IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); + uint64_t OldFileSize = 0; RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It == m_Index.end()) { @@ -1807,11 +1807,11 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c else { // TODO: should check if write is idempotent and bail out if it is? - It.value() = Entry; + OldFileSize = It.value().Location.Size(); + It.value() = Entry; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - uint64_t NewFileSize = Loc.Size(); if (OldFileSize <= NewFileSize) { m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed); -- cgit v1.2.3 From 5d15fa59655c79a0c8ad1b4c5d44b657aa07c29e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 10 May 2022 10:08:31 +0200 Subject: Make sure we clean up temp file in all scenarios --- zenserver/cache/structuredcachestore.cpp | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index c3904d40a..ce55b24b6 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1736,6 +1736,22 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } + bool CleanUpTempFile = false; + auto __ = MakeGuard([&] { + if (CleanUpTempFile) + { + std::error_code Ec; + std::filesystem::remove(DataFile.GetPath(), Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", + DataFile.GetPath(), + m_BucketDir, + Ec.message()); + } + } + }); + DataFile.WriteAll(Value.Value, Ec); if (Ec) { @@ -1762,6 +1778,10 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file + // will be disabled as the file handle has already been closed + CleanUpTempFile = Ec ? true : false; + if (Ec) { std::error_code ExistingEc; @@ -1849,14 +1869,6 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c RetryCount--; } while (RetryCount > 0); - // Once we have called MoveTemporaryIntoPlace we no longer will automatically clean up the temp file - // as the file handle has already been closed - std::filesystem::remove(DataFile.GetPath(), Ec); - if (Ec) - { - ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", DataFile.GetPath(), m_BucketDir, Ec.message()); - } - throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } -- cgit v1.2.3 From 185e819614252d510195b96c20ff39cdc6948f05 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 10 May 2022 13:45:06 +0200 Subject: Add namespace test --- zenserver/cache/structuredcachestore.cpp | 53 ++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index ce55b24b6..c21945702 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -3021,6 +3021,59 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } +TEST_CASE("z$.namespaces") +{ + using namespace testutils; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + ScopedTemporaryDirectory TempDir; + CreateDirectories(TempDir.Path()); + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "teardrinker"sv; + const auto CustomNamespace = "mynamespace"sv; + + // Create a cache record + const IoHash Key = CreateKey(42); + CbObject CacheValue = CreateCacheValue(4096); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key, PutValue); + + ZenCacheValue GetValue; + CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key, GetValue)); + + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue)); + + // This should just be dropped for now until we decide how we add namespaces + Zcs.Put(CustomNamespace, Bucket, Key, PutValue); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue)); + + const IoHash Key2 = CreateKey(43); + CbObject CacheValue2 = CreateCacheValue(4096); + + IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); + Buffer2.SetContentType(ZenContentType::kCbObject); + ZenCacheValue PutValue2 = {.Value = Buffer2}; + Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); + + CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); + } +} + TEST_CASE("z$.blocked.disklayer.put") { ScopedTemporaryDirectory TempDir; -- cgit v1.2.3 From 1274f92cf7ce890b7aa1fc9354503e2508c185eb Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 12 May 2022 13:48:45 +0200 Subject: Tests for HttpRequestParseRelativeUri --- zenserver/cache/structuredcachestore.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index c21945702..5d1d39c50 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2128,8 +2128,6 @@ ZenCacheDiskLayer::TotalSize() const //////////////////////////// ZenCacheStore -static constexpr std::string_view ZenCacheNamespaceDirPrefix = "ns_"; - ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc) { CreateDirectories(BasePath); @@ -2142,7 +2140,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor for (const std::filesystem::path& DirPath : DirContent.Directories) { std::string DirName = PathToUtf8(DirPath.stem()); - if (DirName.starts_with(ZenCacheNamespaceDirPrefix)) + if (DirName.starts_with(NamespacePrefix)) { Namespaces.push_back(DirName.substr(3)); continue; @@ -2155,7 +2153,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor if (std::find(Namespaces.begin(), Namespaces.end(), DefaultNamespace) == Namespaces.end()) { ZEN_INFO("Moving #{} legacy buckets to anonymous namespace", LegacyBuckets.size()); - std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", ZenCacheNamespaceDirPrefix, DefaultNamespace); + std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespacePrefix, DefaultNamespace); CreateDirectories(DefaultNamespaceFolder); // Move any non-namespace folders into the default namespace folder @@ -2176,7 +2174,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor for (const std::string& NamespaceName : Namespaces) { m_Namespaces[NamespaceName] = - std::make_unique(Gc, BasePath / fmt::format("{}{}", ZenCacheNamespaceDirPrefix, NamespaceName)); + std::make_unique(Gc, BasePath / fmt::format("{}{}", NamespacePrefix, NamespaceName)); } } @@ -2284,8 +2282,9 @@ ZenCacheStore::StorageSize() const ////////////////////////////////////////////////////////////////////////// +} // namespace zen + #if ZEN_WITH_TESTS -} namespace zen { -- cgit v1.2.3 From 2f69a30c936bc475bf85daded3706dc67f2e8a0f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 16 May 2022 13:10:10 +0200 Subject: use ns_ prefix on disk only --- zenserver/cache/structuredcachestore.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 5d1d39c50..16130a98b 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2140,9 +2140,9 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor for (const std::filesystem::path& DirPath : DirContent.Directories) { std::string DirName = PathToUtf8(DirPath.stem()); - if (DirName.starts_with(NamespacePrefix)) + if (DirName.starts_with(NamespaceDiskPrefix)) { - Namespaces.push_back(DirName.substr(3)); + Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length())); continue; } LegacyBuckets.push_back(DirName); @@ -2153,7 +2153,8 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor if (std::find(Namespaces.begin(), Namespaces.end(), DefaultNamespace) == Namespaces.end()) { ZEN_INFO("Moving #{} legacy buckets to anonymous namespace", LegacyBuckets.size()); - std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespacePrefix, DefaultNamespace); + + std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, DefaultNamespace); CreateDirectories(DefaultNamespaceFolder); // Move any non-namespace folders into the default namespace folder @@ -2168,13 +2169,14 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message()); } } + Namespaces.push_back(std::string(DefaultNamespace)); } for (const std::string& NamespaceName : Namespaces) { m_Namespaces[NamespaceName] = - std::make_unique(Gc, BasePath / fmt::format("{}{}", NamespacePrefix, NamespaceName)); + std::make_unique(Gc, BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); } } -- cgit v1.2.3 From a8f84317c4caec04ede7744356da1b7b2f15c545 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 17 May 2022 15:22:54 +0200 Subject: fix release build, misplaced namespace brackets --- zenserver/cache/structuredcachestore.cpp | 4 ---- 1 file changed, 4 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 16130a98b..6b7b73dcf 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2284,12 +2284,8 @@ ZenCacheStore::StorageSize() const ////////////////////////////////////////////////////////////////////////// -} // namespace zen - #if ZEN_WITH_TESTS -namespace zen { - using namespace std::literals; namespace testutils { -- cgit v1.2.3 From b373a645f787f28a4f7a831c161553df6fd4d72c Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 19 May 2022 15:11:35 +0200 Subject: migrate legacy cache folders to ue4.ddc namespace map default namespace to at runtime ue4.ddc use a non-valid name for the default namespace so we avoid any collision or accidental creation of folder for that --- zenserver/cache/structuredcachestore.cpp | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 6b7b73dcf..9218c2cbb 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2128,6 +2128,8 @@ ZenCacheDiskLayer::TotalSize() const //////////////////////////// ZenCacheStore +static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; + ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc) { CreateDirectories(BasePath); @@ -2150,11 +2152,13 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size()); - if (std::find(Namespaces.begin(), Namespaces.end(), DefaultNamespace) == Namespaces.end()) + if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) { - ZEN_INFO("Moving #{} legacy buckets to anonymous namespace", LegacyBuckets.size()); + // default (unspecified) and ue4-ddc namespace points to the same namespace instance + + ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName); - std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, DefaultNamespace); + std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); CreateDirectories(DefaultNamespaceFolder); // Move any non-namespace folders into the default namespace folder @@ -2169,8 +2173,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message()); } } - - Namespaces.push_back(std::string(DefaultNamespace)); + Namespaces.push_back(std::string(UE4DDCNamespaceName)); } for (const std::string& NamespaceName : Namespaces) @@ -2237,6 +2240,13 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) { return It->second.get(); } + if (Namespace == DefaultNamespace) + { + if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) + { + return It->second.get(); + } + } return nullptr; } @@ -2249,6 +2259,10 @@ ZenCacheStore::IterateNamespaces(const std::function Date: Thu, 19 May 2022 15:42:04 +0200 Subject: Fix and retry count and add an extra iteration to give more time for success --- zenserver/cache/structuredcachestore.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 6b7b73dcf..4edc13b4a 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1769,7 +1769,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c // We retry to move the file since it can be held open for read. // This happens if the server processes a Get request for the file or // if we are busy sending the file upstream - int RetryCount = 3; + int RetryCount = 4; do { Ec.clear(); @@ -1789,7 +1789,8 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c if (!ExistingEc && (OldFileSize == NewFileSize)) { ZEN_INFO( - "Failed to move temporary file '{}' to '{}'. Target file has same size, assuming concurrent write of same value, " + "Failed to move temporary file '{}' to '{}' for '{}'. Target file has same size, assuming concurrent write of same " + "value, " "move " "failed with reason '{}'", DataFile.GetPath(), @@ -1865,9 +1866,8 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c Ec.message()); // Semi arbitrary back-off - zen::Sleep(200 * (4 - RetryCount)); // Sleep at most for a total of 2 seconds - RetryCount--; - } while (RetryCount > 0); + zen::Sleep(200 * (5 - RetryCount)); // Sleep at most for a total of 3 seconds + } while (RetryCount-- > 0); throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } -- cgit v1.2.3