diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-23 14:54:22 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-23 05:54:22 -0800 |
| commit | d361aa896e2e74ae4a790c4668c78c830f9b5d1c (patch) | |
| tree | c76518eaab8d4b6b0ba185bdec0fe07639729ea8 /zenserver/cache/structuredcachestore.cpp | |
| parent | junit test reporting (#239) (diff) | |
| download | zen-d361aa896e2e74ae4a790c4668c78c830f9b5d1c.tar.xz zen-d361aa896e2e74ae4a790c4668c78c830f9b5d1c.zip | |
store cache rawhash and rawsize for unstructured cache values (#234)
* refactored MemoryCacheBucket to allow for storing RawHash/RawSize.
* remove redundant conversions in AccessTime
* reduce max count for memory cache bucket to 32-bit value
* refactored DiskCacheBucket to allow for storing RawHash/RawSize.
* Use CompressedBuffer::ValidateCompressedHeader when applicable
* Make sure we rewrite the snapshot if we read an legacy existing index/log
* changelog
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 498 |
1 files changed, 366 insertions, 132 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index d93c54a06..55af85ade 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -52,7 +52,8 @@ namespace { struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t CurrentVersion = 2; + static constexpr uint32_t Version2 = 2; + static constexpr uint32_t CurrentVersion = 3; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; @@ -69,10 +70,18 @@ namespace { static_assert(sizeof(CacheBucketIndexHeader) == 32); + struct DiskIndexEntry_V2 + { + IoHash Key; // 20 bytes + DiskLocation Location; // 12 bytes + }; + + static_assert(sizeof(DiskIndexEntry_V2) == 32); #pragma pack(pop) const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; + const char* LogExtensionV2 = ".v2.slog"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { @@ -81,7 +90,12 @@ namespace { std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { - return BucketDir / (BucketName + ".tmp" + IndexExtension); + return BucketDir / (BucketName + ".tmp"); + } + + std::filesystem::path GetLogPathV2(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + LogExtensionV2); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) @@ -89,6 +103,32 @@ namespace { return BucketDir / (BucketName + LogExtension); } + bool ValidateEntry(const DiskIndexEntry_V2& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.GetFlags() & + ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -106,6 +146,19 @@ namespace { { return true; } + if (Entry.RawSize != 0) + { + if (Entry.RawHash == IoHash::Zero) + { + OutReason = fmt::format("Invalid raw hash for entry {}", Entry.Key.ToHexString()); + return false; + } + } + else if (Entry.RawHash != IoHash::Zero) + { + OutReason = fmt::format("Invalid raw size for entry {}", Entry.Key.ToHexString()); + return false; + } uint64_t Size = Entry.Location.Size(); if (Size == 0) { @@ -609,8 +662,9 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - OutValue.Value = m_Payloads[EntryIndex].Payload; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); + const BucketPayload& Payload = m_Payloads[EntryIndex]; + OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash}; + m_AccessTimes[EntryIndex] = GcClock::TickCount(); return true; } @@ -621,16 +675,7 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { - size_t PayloadSize = Value.Value.GetSize(); - IoHash RawHash = IoHash::Zero; - uint32_t RawSize = 0u; - // TODO: Temporary hack - should come from caller really as they likely already figured out at least rawhash (via attachment message) - // if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - // { - // CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Value.Value)); - // RawHash = Compressed.DecodeRawHash(); - // RawSize = gsl::narrow<uint32_t>(Compressed.DecodeRawSize()); - // } + size_t PayloadSize = Value.Value.GetSize(); { GcClock::Tick AccessTime = GcClock::TickCount(); RwLock::ExclusiveLockScope _(m_BucketLock); @@ -647,14 +692,15 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed); BucketPayload& Payload = m_Payloads[EntryIndex]; Payload.Payload = Value.Value; - Payload.RawHash = RawHash; - Payload.RawSize = RawSize; + Payload.RawHash = Value.RawHash; + Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); m_AccessTimes[EntryIndex] = AccessTime; } else { uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Payload = Value.Value, .RawSize = RawSize, .RawHash = RawHash}); + m_Payloads.emplace_back( + BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(AccessTime); m_CacheMap.insert_or_assign(HashKey, EntryIndex); } @@ -698,10 +744,11 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo using namespace std::literals; m_BlocksBasePath = BucketDir / "blocks"; + m_BucketDir = BucketDir; - CreateDirectories(BucketDir); + CreateDirectories(m_BucketDir); - std::filesystem::path ManifestPath{BucketDir / "zen_manifest"}; + std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; bool IsNew = false; @@ -730,7 +777,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo return false; } - OpenLog(BucketDir, IsNew); + OpenLog(IsNew); for (CbFieldView Entry : Manifest["Timestamps"]) { @@ -796,9 +843,8 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = m_Payloads[Entry.second].Location; - // TODO: Update DiskIndexEntry - // IndexEntry.RawHash = m_Payloads[Entry.second].RawHash; - // IndexEntry.RawSize = m_Payloads[Entry.second].RawSize; + IndexEntry.RawHash = m_Payloads[Entry.second].RawHash; + IndexEntry.RawSize = m_Payloads[Entry.second].RawSize; } } @@ -836,18 +882,14 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() } uint64_t -ZenCacheDiskLayer::CacheBucket::ReadIndexFile() +ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { - std::vector<DiskIndexEntry> Entries; - std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (std::filesystem::is_regular_file(IndexPath)) { + size_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing #{} entries in {}", - IndexPath, - Entries.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + ZEN_INFO("read store '{}' index containing #{} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; @@ -855,49 +897,157 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile() uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CacheBucketIndexHeader)) { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) && - (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && - (Header.EntryCount <= ExpectedEntryCount)) + if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && + (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) { - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - m_PayloadAlignment = Header.PayloadAlignment; - - std::string InvalidEntryReason; - for (const DiskIndexEntry& Entry : Entries) + switch (Header.Version) { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - size_t EntryIndex = m_Payloads.size(); - // TODO: Get from stored index or check payload to get the relevant info - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0u; - m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = RawSize, .RawHash = RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(Entry.Key, EntryIndex); + case CacheBucketIndexHeader::Version2: + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry_V2); + if (Header.EntryCount > ExpectedEntryCount) + { + break; + } + m_PayloadAlignment = Header.PayloadAlignment; + + std::vector<DiskIndexEntry_V2> Entries; + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), + Header.EntryCount * sizeof(DiskIndexEntry_V2), + sizeof(CacheBucketIndexHeader)); + + m_Payloads.reserve(Header.EntryCount); + m_AccessTimes.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + std::string InvalidEntryReason; + for (const DiskIndexEntry_V2& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(Entry.Key, EntryIndex); + EntryCount++; + } + OutVersion = CacheBucketIndexHeader::Version2; + return Header.LogPosition; + } + break; + case CacheBucketIndexHeader::CurrentVersion: + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + if (Header.EntryCount > ExpectedEntryCount) + { + break; + } + m_PayloadAlignment = Header.PayloadAlignment; + + std::vector<DiskIndexEntry> Entries; + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), + Header.EntryCount * sizeof(DiskIndexEntry), + sizeof(CacheBucketIndexHeader)); + + m_Payloads.reserve(Header.EntryCount); + m_AccessTimes.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + std::string InvalidEntryReason; + for (const DiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back( + BucketPayload{.Location = Entry.Location, .RawSize = Entry.RawSize, .RawHash = Entry.RawHash}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(Entry.Key, EntryIndex); + EntryCount++; + } + OutVersion = CacheBucketIndexHeader::CurrentVersion; + return Header.LogPosition; + } + break; + default: + break; } + } + } + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + return 0; +} - return Header.LogPosition; +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadLogV2(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + if (std::filesystem::is_regular_file(LogPath)) + { + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<DiskIndexEntry_V2> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; } - else + LogEntryCount = EntryCount - SkipEntryCount; + m_Index.reserve(LogEntryCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const DiskIndexEntry_V2& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + size_t EntryIndex = m_Payloads.size(); + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0u; + m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = RawSize, .RawHash = RawHash}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(Record.Key, EntryIndex); + }, + SkipEntryCount); + if (InvalidEntryCount) { - ZEN_WARN("skipping invalid index file '{}'", IndexPath); + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); } + return LogEntryCount; } } return 0; -} +}; uint64_t -ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) +ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); if (std::filesystem::is_regular_file(LogPath)) { uint64_t LogEntryCount = 0; @@ -933,10 +1083,8 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) return; } size_t EntryIndex = m_Payloads.size(); - // TODO: Get from stored index or check payload to get the relevant info - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0u; - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = RawSize, .RawHash = RawHash}); + m_Payloads.emplace_back( + BucketPayload{.Location = Record.Location, .RawSize = Record.RawSize, .RawHash = Record.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(Record.Key, EntryIndex); }, @@ -952,37 +1100,72 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) }; void -ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) { - m_BucketDir = BucketDir; - m_TotalStandaloneSize = 0; m_Index.clear(); + m_Payloads.clear(); + m_AccessTimes.clear(); + std::filesystem::path LogPathV2 = GetLogPathV2(m_BucketDir, m_BucketName); std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { fs::remove(LogPath); + fs::remove(LogPathV2); fs::remove(IndexPath); fs::remove_all(m_BlocksBasePath); } - m_LogFlushPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); + bool RewriteLog = false; + uint64_t LogEntryCount = 0; + { + uint32_t IndexVersion = 0; + m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); + if ((IndexVersion != CacheBucketIndexHeader::CurrentVersion) && TCasLogFile<DiskIndexEntry_V2>::IsValid(LogPath)) + { + if (std::filesystem::exists(LogPathV2)) + { + std::filesystem::remove(LogPathV2); + } + std::filesystem::rename(LogPath, LogPathV2); + LogEntryCount = ReadLogV2(LogPathV2, m_LogFlushPosition); + RewriteLog = true; + // We have a new snapshot format, so we have not flushed any log entries + LogEntryCount += m_LogFlushPosition; + m_LogFlushPosition = 0; + } + else + { + LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); + } + } CreateDirectories(m_BucketDir); m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + std::vector<DiskIndexEntry> ConvertedEntries; + if (RewriteLog) + { + ConvertedEntries.reserve(m_Index.size()); + } std::vector<BlockStoreLocation> KnownLocations; KnownLocations.reserve(m_Index.size()); for (const auto& Entry : m_Index) { - size_t EntryIndex = Entry.second; - const DiskLocation& Location = m_Payloads[EntryIndex].Location; + size_t EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + const DiskLocation& Location = Payload.Location; + + if (RewriteLog) + { + ConvertedEntries.push_back({.RawSize = Payload.RawSize, .Location = Location, .RawHash = Payload.RawHash, .Key = Entry.first}); + } + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); @@ -991,13 +1174,23 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); KnownLocations.push_back(BlockLocation); } + if (!ConvertedEntries.empty()) + { + m_SlogFile.Append(ConvertedEntries); + std::error_code Ec; + std::filesystem::remove(LogPathV2); + if (Ec) + { + ZEN_WARN("failed to remove legacy log file '{}' FAILED, reason: '{}'", LogPathV2, Ec.message()); + } + } m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); - if (IsNew || LogEntryCount > 0) { MakeIndexSnapshot(); } + // TODO: should validate integrity of container files here } @@ -1018,23 +1211,22 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } -bool -ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) +IoBuffer +ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) { BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); - OutValue.Value = m_BlockStore.TryGetChunk(Location); - if (!OutValue.Value) + IoBuffer Value = m_BlockStore.TryGetChunk(Location); + if (Value) { - return false; + Value.SetContentType(Loc.GetContentType()); } - OutValue.Value.SetContentType(Loc.GetContentType()); - return true; + return Value; } -bool -ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue) +IoBuffer +ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) { ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); @@ -1043,13 +1235,12 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { - OutValue.Value = Data; - OutValue.Value.SetContentType(Loc.GetContentType()); + Data.SetContentType(Loc.GetContentType()); - return true; + return Data; } - return false; + return {}; } bool @@ -1061,17 +1252,50 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal { return false; } - size_t EntryIndex = It.value(); - BucketPayload Payload = m_Payloads[EntryIndex]; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - DiskLocation Location = Payload.Location; + size_t EntryIndex = It.value(); + const BucketPayload& Payload = m_Payloads[EntryIndex]; + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = Payload.Location; + OutValue.RawSize = Payload.RawSize; + OutValue.RawHash = Payload.RawHash; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { // We don't need to hold the index lock when we read a standalone file _.ReleaseNow(); - return GetStandaloneCacheValue(Location, HashKey, OutValue); + OutValue.Value = GetStandaloneCacheValue(Location, HashKey); } - return GetInlineCacheValue(Location, OutValue); + else + { + OutValue.Value = GetInlineCacheValue(Location); + } + _.ReleaseNow(); + + if (!Location.IsFlagSet(DiskLocation::kStructured)) + { + if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) + { + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) + { + BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; + WritePayload.RawHash = OutValue.RawHash; + WritePayload.RawSize = OutValue.RawSize; + + m_LogFlushPosition = 0; // Force resave of index on exit + } + } + } + + return (bool)OutValue.Value; } void @@ -1101,6 +1325,8 @@ ZenCacheDiskLayer::CacheBucket::Drop() bool Deleted = MoveAndDeleteDirectory(m_BucketDir); m_Index.clear(); + m_Payloads.clear(); + m_AccessTimes.clear(); return Deleted; } @@ -1207,13 +1433,13 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) } continue; } - ZenCacheValue Value; - if (!GetStandaloneCacheValue(Loc, HashKey, Value)) + IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); + if (!Buffer) { BadKeys.push_back(HashKey); continue; } - if (!ValidateEntry(HashKey, Loc.GetContentType(), Value.Value)) + if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer)) { BadKeys.push_back(HashKey); continue; @@ -1294,10 +1520,12 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) for (const IoHash& BadKey : BadKeys) { // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - DiskLocation Location = m_Payloads[It->second].Location; + const auto It = m_Index.find(BadKey); + const BucketPayload& Payload = m_Payloads[It->second]; + DiskLocation Location = Payload.Location; Location.Flags |= DiskLocation::kTombStone; - LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); + LogEntries.push_back( + DiskIndexEntry{.RawSize = Payload.RawSize, .Location = Location, .RawHash = Payload.RawHash, .Key = BadKey}); m_Index.erase(BadKey); } } @@ -1380,7 +1608,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - IndexMap Index; + IndexMap Index; + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; @@ -1390,6 +1620,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); Index = m_Index; + AccessTimes = m_AccessTimes; + Payloads = m_Payloads; } std::vector<IoHash> ExpiredKeys; @@ -1401,14 +1633,14 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) for (const auto& Entry : Index) { const IoHash& Key = Entry.first; - GcClock::Tick AccessTime = m_AccessTimes[Entry.second]; + GcClock::Tick AccessTime = AccessTimes[Entry.second]; if (AccessTime < ExpireTicks) { ExpiredKeys.push_back(Key); continue; } - const DiskLocation& Loc = m_Payloads[Entry.second].Location; + const DiskLocation& Loc = Payloads[Entry.second].Location; if (Loc.IsFlagSet(DiskLocation::kStructured)) { @@ -1418,7 +1650,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) Cids.clear(); } - ZenCacheValue CacheValue; + IoBuffer Buffer; { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; @@ -1431,20 +1663,20 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { // We don't need to hold the index lock when we read a standalone file __.ReleaseNow(); - if (!GetStandaloneCacheValue(Loc, Key, CacheValue)) + if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) { continue; } } - else if (!GetInlineCacheValue(Loc, CacheValue)) + else if (Buffer = GetInlineCacheValue(Loc); !Buffer) { continue; } } - ZEN_ASSERT(CacheValue.Value); - ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); - CbObject Obj(SharedBuffer{CacheValue.Value}); + ZEN_ASSERT(Buffer); + ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); + CbObject Obj(SharedBuffer{Buffer}); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } @@ -1571,7 +1803,11 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { if (auto It = Index.find(Key); It != Index.end()) { - DiskIndexEntry Entry = {.Key = It->first, .Location = m_Payloads[It->second].Location}; + const BucketPayload& Payload = m_Payloads[It->second]; + DiskIndexEntry Entry = {.RawSize = Payload.RawSize, + .Location = Payload.Location, + .RawHash = Payload.RawHash, + .Key = It->first}; if (Entry.Location.Flags & DiskLocation::kStandaloneFile) { Entry.Location.Flags |= DiskLocation::kTombStone; @@ -1647,12 +1883,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { continue; } - m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); + m_SlogFile.Append( + DiskIndexEntry{.RawSize = Entry.RawSize, .Location = RestoreLocation, .RawHash = Entry.RawHash, .Key = Key}); size_t EntryIndex = m_Payloads.size(); - // TODO: Get from stored index or check payload to get the relevant info - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0u; - m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation, .RawSize = RawSize, .RawHash = RawHash}); + m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation, .RawSize = Entry.RawSize, .RawHash = Entry.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert({Key, EntryIndex}); m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); @@ -1734,18 +1968,24 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const DiskLocation& OldDiskLocation = m_Payloads[Index[ChunkHash]].Location; - LogEntries.push_back( - {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); + const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; + const DiskLocation& OldDiskLocation = OldPayload.Location; + LogEntries.push_back({.RawSize = OldPayload.RawSize, + .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags()), + .RawHash = OldPayload.RawHash, + .Key = ChunkHash}); } for (const size_t ChunkIndex : RemovedChunks) { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const DiskLocation& OldDiskLocation = m_Payloads[Index[ChunkHash]].Location; - LogEntries.push_back({.Key = ChunkHash, + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; + const DiskLocation& OldDiskLocation = OldPayload.Location; + LogEntries.push_back({.RawSize = OldPayload.RawSize, .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), m_PayloadAlignment, - OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + OldDiskLocation.GetFlags() | DiskLocation::kTombStone), + .RawHash = OldPayload.RawHash, + .Key = ChunkHash}); DeletedChunks.insert(ChunkHash); } @@ -1929,16 +2169,13 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } DiskLocation Loc(NewFileSize, EntryFlags); - // TODO: Get from caller input - IoHash RawHash = IoHash::Zero; - size_t RawSize = 0u; RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = RawSize, .RawHash = RawHash}); + m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } @@ -1947,12 +2184,12 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c // TODO: should check if write is idempotent and bail out if it is? size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = RawSize, .RawHash = RawHash}; + m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; m_AccessTimes.emplace_back(GcClock::TickCount()); m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); } - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_SlogFile.Append({.RawSize = Value.RawSize, .Location = Loc, .RawHash = Value.RawHash, .Key = HashKey}); m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); } @@ -1971,12 +2208,9 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const } m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; - m_SlogFile.Append(DiskIndexEntry); - // TODO: Get from caller input - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0u; + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + m_SlogFile.Append({.RawSize = Value.RawSize, .Location = Location, .RawHash = Value.RawHash, .Key = HashKey}); + RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { @@ -1985,13 +2219,13 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const // content hash to the index entry size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = RawSize, .RawHash = RawHash}); + m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); } else { size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = RawSize, .RawHash = RawHash}); + m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } @@ -2605,7 +2839,7 @@ TEST_CASE("z$.size") for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); + Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), ZenCacheValue{.Value = Buffer}); } CacheSize = Zcs.StorageSize(); |