aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-23 14:54:22 +0100
committerGitHub <[email protected]>2023-02-23 05:54:22 -0800
commitd361aa896e2e74ae4a790c4668c78c830f9b5d1c (patch)
treec76518eaab8d4b6b0ba185bdec0fe07639729ea8 /zenserver/cache/structuredcachestore.cpp
parentjunit test reporting (#239) (diff)
downloadzen-d361aa896e2e74ae4a790c4668c78c830f9b5d1c.tar.xz
zen-d361aa896e2e74ae4a790c4668c78c830f9b5d1c.zip
store cache rawhash and rawsize for unstructured cache values (#234)
* refactored MemoryCacheBucket to allow for storing RawHash/RawSize. * remove redundant conversions in AccessTime * reduce max count for memory cache bucket to 32-bit value * refactored DiskCacheBucket to allow for storing RawHash/RawSize. * Use CompressedBuffer::ValidateCompressedHeader when applicable * Make sure we rewrite the snapshot if we read an legacy existing index/log * changelog
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp498
1 files changed, 366 insertions, 132 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index d93c54a06..55af85ade 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -52,7 +52,8 @@ namespace {
struct CacheBucketIndexHeader
{
static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t CurrentVersion = 2;
+ static constexpr uint32_t Version2 = 2;
+ static constexpr uint32_t CurrentVersion = 3;
uint32_t Magic = ExpectedMagic;
uint32_t Version = CurrentVersion;
@@ -69,10 +70,18 @@ namespace {
static_assert(sizeof(CacheBucketIndexHeader) == 32);
+ struct DiskIndexEntry_V2
+ {
+ IoHash Key; // 20 bytes
+ DiskLocation Location; // 12 bytes
+ };
+
+ static_assert(sizeof(DiskIndexEntry_V2) == 32);
#pragma pack(pop)
const char* IndexExtension = ".uidx";
const char* LogExtension = ".slog";
+ const char* LogExtensionV2 = ".v2.slog";
std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
@@ -81,7 +90,12 @@ namespace {
std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
- return BucketDir / (BucketName + ".tmp" + IndexExtension);
+ return BucketDir / (BucketName + ".tmp");
+ }
+
+ std::filesystem::path GetLogPathV2(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + LogExtensionV2);
}
std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
@@ -89,6 +103,32 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
+ bool ValidateEntry(const DiskIndexEntry_V2& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.GetFlags() &
+ ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -106,6 +146,19 @@ namespace {
{
return true;
}
+ if (Entry.RawSize != 0)
+ {
+ if (Entry.RawHash == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid raw hash for entry {}", Entry.Key.ToHexString());
+ return false;
+ }
+ }
+ else if (Entry.RawHash != IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid raw size for entry {}", Entry.Key.ToHexString());
+ return false;
+ }
uint64_t Size = Entry.Location.Size();
if (Size == 0)
{
@@ -609,8 +662,9 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
- OutValue.Value = m_Payloads[EntryIndex].Payload;
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash};
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
return true;
}
@@ -621,16 +675,7 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
void
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
- size_t PayloadSize = Value.Value.GetSize();
- IoHash RawHash = IoHash::Zero;
- uint32_t RawSize = 0u;
- // TODO: Temporary hack - should come from caller really as they likely already figured out at least rawhash (via attachment message)
- // if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- // {
- // CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Value.Value));
- // RawHash = Compressed.DecodeRawHash();
- // RawSize = gsl::narrow<uint32_t>(Compressed.DecodeRawSize());
- // }
+ size_t PayloadSize = Value.Value.GetSize();
{
GcClock::Tick AccessTime = GcClock::TickCount();
RwLock::ExclusiveLockScope _(m_BucketLock);
@@ -647,14 +692,15 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed);
BucketPayload& Payload = m_Payloads[EntryIndex];
Payload.Payload = Value.Value;
- Payload.RawHash = RawHash;
- Payload.RawSize = RawSize;
+ Payload.RawHash = Value.RawHash;
+ Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize);
m_AccessTimes[EntryIndex] = AccessTime;
}
else
{
uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Payload = Value.Value, .RawSize = RawSize, .RawHash = RawHash});
+ m_Payloads.emplace_back(
+ BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash});
m_AccessTimes.emplace_back(AccessTime);
m_CacheMap.insert_or_assign(HashKey, EntryIndex);
}
@@ -698,10 +744,11 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
using namespace std::literals;
m_BlocksBasePath = BucketDir / "blocks";
+ m_BucketDir = BucketDir;
- CreateDirectories(BucketDir);
+ CreateDirectories(m_BucketDir);
- std::filesystem::path ManifestPath{BucketDir / "zen_manifest"};
+ std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
bool IsNew = false;
@@ -730,7 +777,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
return false;
}
- OpenLog(BucketDir, IsNew);
+ OpenLog(IsNew);
for (CbFieldView Entry : Manifest["Timestamps"])
{
@@ -796,9 +843,8 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
IndexEntry.Key = Entry.first;
IndexEntry.Location = m_Payloads[Entry.second].Location;
- // TODO: Update DiskIndexEntry
- // IndexEntry.RawHash = m_Payloads[Entry.second].RawHash;
- // IndexEntry.RawSize = m_Payloads[Entry.second].RawSize;
+ IndexEntry.RawHash = m_Payloads[Entry.second].RawHash;
+ IndexEntry.RawSize = m_Payloads[Entry.second].RawSize;
}
}
@@ -836,18 +882,14 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
}
uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadIndexFile()
+ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
- std::vector<DiskIndexEntry> Entries;
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
if (std::filesystem::is_regular_file(IndexPath))
{
+ size_t EntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing #{} entries in {}",
- IndexPath,
- Entries.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ ZEN_INFO("read store '{}' index containing #{} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
BasicFile ObjectIndexFile;
@@ -855,49 +897,157 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile()
uint64_t Size = ObjectIndexFile.FileSize();
if (Size >= sizeof(CacheBucketIndexHeader))
{
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
CacheBucketIndexHeader Header;
ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) &&
- (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
- (Header.EntryCount <= ExpectedEntryCount))
+ if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) &&
+ (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0))
{
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- m_PayloadAlignment = Header.PayloadAlignment;
-
- std::string InvalidEntryReason;
- for (const DiskIndexEntry& Entry : Entries)
+ switch (Header.Version)
{
- if (!ValidateEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- size_t EntryIndex = m_Payloads.size();
- // TODO: Get from stored index or check payload to get the relevant info
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0u;
- m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = RawSize, .RawHash = RawHash});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(Entry.Key, EntryIndex);
+ case CacheBucketIndexHeader::Version2:
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry_V2);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ break;
+ }
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::vector<DiskIndexEntry_V2> Entries;
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(),
+ Header.EntryCount * sizeof(DiskIndexEntry_V2),
+ sizeof(CacheBucketIndexHeader));
+
+ m_Payloads.reserve(Header.EntryCount);
+ m_AccessTimes.reserve(Header.EntryCount);
+ m_Index.reserve(Header.EntryCount);
+
+ std::string InvalidEntryReason;
+ for (const DiskIndexEntry_V2& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(Entry.Key, EntryIndex);
+ EntryCount++;
+ }
+ OutVersion = CacheBucketIndexHeader::Version2;
+ return Header.LogPosition;
+ }
+ break;
+ case CacheBucketIndexHeader::CurrentVersion:
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ break;
+ }
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::vector<DiskIndexEntry> Entries;
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(),
+ Header.EntryCount * sizeof(DiskIndexEntry),
+ sizeof(CacheBucketIndexHeader));
+
+ m_Payloads.reserve(Header.EntryCount);
+ m_AccessTimes.reserve(Header.EntryCount);
+ m_Index.reserve(Header.EntryCount);
+
+ std::string InvalidEntryReason;
+ for (const DiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(
+ BucketPayload{.Location = Entry.Location, .RawSize = Entry.RawSize, .RawHash = Entry.RawHash});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(Entry.Key, EntryIndex);
+ EntryCount++;
+ }
+ OutVersion = CacheBucketIndexHeader::CurrentVersion;
+ return Header.LogPosition;
+ }
+ break;
+ default:
+ break;
}
+ }
+ }
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ }
+ return 0;
+}
- return Header.LogPosition;
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadLogV2(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
+{
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<DiskIndexEntry_V2> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
}
- else
+ LogEntryCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(LogEntryCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const DiskIndexEntry_V2& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ size_t EntryIndex = m_Payloads.size();
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0u;
+ m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = RawSize, .RawHash = RawHash});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(Record.Key, EntryIndex);
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
{
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
}
+ return LogEntryCount;
}
}
return 0;
-}
+};
uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
+ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
if (std::filesystem::is_regular_file(LogPath))
{
uint64_t LogEntryCount = 0;
@@ -933,10 +1083,8 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
return;
}
size_t EntryIndex = m_Payloads.size();
- // TODO: Get from stored index or check payload to get the relevant info
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0u;
- m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = RawSize, .RawHash = RawHash});
+ m_Payloads.emplace_back(
+ BucketPayload{.Location = Record.Location, .RawSize = Record.RawSize, .RawHash = Record.RawHash});
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert_or_assign(Record.Key, EntryIndex);
},
@@ -952,37 +1100,72 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
};
void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
{
- m_BucketDir = BucketDir;
-
m_TotalStandaloneSize = 0;
m_Index.clear();
+ m_Payloads.clear();
+ m_AccessTimes.clear();
+ std::filesystem::path LogPathV2 = GetLogPathV2(m_BucketDir, m_BucketName);
std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
if (IsNew)
{
fs::remove(LogPath);
+ fs::remove(LogPathV2);
fs::remove(IndexPath);
fs::remove_all(m_BlocksBasePath);
}
- m_LogFlushPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
+ bool RewriteLog = false;
+ uint64_t LogEntryCount = 0;
+ {
+ uint32_t IndexVersion = 0;
+ m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion);
+ if ((IndexVersion != CacheBucketIndexHeader::CurrentVersion) && TCasLogFile<DiskIndexEntry_V2>::IsValid(LogPath))
+ {
+ if (std::filesystem::exists(LogPathV2))
+ {
+ std::filesystem::remove(LogPathV2);
+ }
+ std::filesystem::rename(LogPath, LogPathV2);
+ LogEntryCount = ReadLogV2(LogPathV2, m_LogFlushPosition);
+ RewriteLog = true;
+ // We have a new snapshot format, so we have not flushed any log entries
+ LogEntryCount += m_LogFlushPosition;
+ m_LogFlushPosition = 0;
+ }
+ else
+ {
+ LogEntryCount = ReadLog(LogPath, m_LogFlushPosition);
+ }
+ }
CreateDirectories(m_BucketDir);
m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+ std::vector<DiskIndexEntry> ConvertedEntries;
+ if (RewriteLog)
+ {
+ ConvertedEntries.reserve(m_Index.size());
+ }
std::vector<BlockStoreLocation> KnownLocations;
KnownLocations.reserve(m_Index.size());
for (const auto& Entry : m_Index)
{
- size_t EntryIndex = Entry.second;
- const DiskLocation& Location = m_Payloads[EntryIndex].Location;
+ size_t EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ const DiskLocation& Location = Payload.Location;
+
+ if (RewriteLog)
+ {
+ ConvertedEntries.push_back({.RawSize = Payload.RawSize, .Location = Location, .RawHash = Payload.RawHash, .Key = Entry.first});
+ }
+
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
@@ -991,13 +1174,23 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
KnownLocations.push_back(BlockLocation);
}
+ if (!ConvertedEntries.empty())
+ {
+ m_SlogFile.Append(ConvertedEntries);
+ std::error_code Ec;
+ std::filesystem::remove(LogPathV2);
+ if (Ec)
+ {
+ ZEN_WARN("failed to remove legacy log file '{}' FAILED, reason: '{}'", LogPathV2, Ec.message());
+ }
+ }
m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
-
if (IsNew || LogEntryCount > 0)
{
MakeIndexSnapshot();
}
+
// TODO: should validate integrity of container files here
}
@@ -1018,23 +1211,22 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H
Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString));
}
-bool
-ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue)
+IoBuffer
+ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc)
{
BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
- OutValue.Value = m_BlockStore.TryGetChunk(Location);
- if (!OutValue.Value)
+ IoBuffer Value = m_BlockStore.TryGetChunk(Location);
+ if (Value)
{
- return false;
+ Value.SetContentType(Loc.GetContentType());
}
- OutValue.Value.SetContentType(Loc.GetContentType());
- return true;
+ return Value;
}
-bool
-ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue)
+IoBuffer
+ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey)
{
ExtendablePathBuilder<256> DataFilePath;
BuildPath(DataFilePath, HashKey);
@@ -1043,13 +1235,12 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
{
- OutValue.Value = Data;
- OutValue.Value.SetContentType(Loc.GetContentType());
+ Data.SetContentType(Loc.GetContentType());
- return true;
+ return Data;
}
- return false;
+ return {};
}
bool
@@ -1061,17 +1252,50 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
{
return false;
}
- size_t EntryIndex = It.value();
- BucketPayload Payload = m_Payloads[EntryIndex];
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- DiskLocation Location = Payload.Location;
+ size_t EntryIndex = It.value();
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = Payload.Location;
+ OutValue.RawSize = Payload.RawSize;
+ OutValue.RawHash = Payload.RawHash;
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
// We don't need to hold the index lock when we read a standalone file
_.ReleaseNow();
- return GetStandaloneCacheValue(Location, HashKey, OutValue);
+ OutValue.Value = GetStandaloneCacheValue(Location, HashKey);
}
- return GetInlineCacheValue(Location, OutValue);
+ else
+ {
+ OutValue.Value = GetInlineCacheValue(Location);
+ }
+ _.ReleaseNow();
+
+ if (!Location.IsFlagSet(DiskLocation::kStructured))
+ {
+ if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0)
+ {
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize);
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
+ {
+ BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
+ WritePayload.RawHash = OutValue.RawHash;
+ WritePayload.RawSize = OutValue.RawSize;
+
+ m_LogFlushPosition = 0; // Force resave of index on exit
+ }
+ }
+ }
+
+ return (bool)OutValue.Value;
}
void
@@ -1101,6 +1325,8 @@ ZenCacheDiskLayer::CacheBucket::Drop()
bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
m_Index.clear();
+ m_Payloads.clear();
+ m_AccessTimes.clear();
return Deleted;
}
@@ -1207,13 +1433,13 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
continue;
}
- ZenCacheValue Value;
- if (!GetStandaloneCacheValue(Loc, HashKey, Value))
+ IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
+ if (!Buffer)
{
BadKeys.push_back(HashKey);
continue;
}
- if (!ValidateEntry(HashKey, Loc.GetContentType(), Value.Value))
+ if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer))
{
BadKeys.push_back(HashKey);
continue;
@@ -1294,10 +1520,12 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
for (const IoHash& BadKey : BadKeys)
{
// Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- DiskLocation Location = m_Payloads[It->second].Location;
+ const auto It = m_Index.find(BadKey);
+ const BucketPayload& Payload = m_Payloads[It->second];
+ DiskLocation Location = Payload.Location;
Location.Flags |= DiskLocation::kTombStone;
- LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
+ LogEntries.push_back(
+ DiskIndexEntry{.RawSize = Payload.RawSize, .Location = Location, .RawHash = Payload.RawHash, .Key = BadKey});
m_Index.erase(BadKey);
}
}
@@ -1380,7 +1608,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
- IndexMap Index;
+ IndexMap Index;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
{
RwLock::SharedLockScope __(m_IndexLock);
Stopwatch Timer;
@@ -1390,6 +1620,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
Index = m_Index;
+ AccessTimes = m_AccessTimes;
+ Payloads = m_Payloads;
}
std::vector<IoHash> ExpiredKeys;
@@ -1401,14 +1633,14 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
for (const auto& Entry : Index)
{
const IoHash& Key = Entry.first;
- GcClock::Tick AccessTime = m_AccessTimes[Entry.second];
+ GcClock::Tick AccessTime = AccessTimes[Entry.second];
if (AccessTime < ExpireTicks)
{
ExpiredKeys.push_back(Key);
continue;
}
- const DiskLocation& Loc = m_Payloads[Entry.second].Location;
+ const DiskLocation& Loc = Payloads[Entry.second].Location;
if (Loc.IsFlagSet(DiskLocation::kStructured))
{
@@ -1418,7 +1650,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
Cids.clear();
}
- ZenCacheValue CacheValue;
+ IoBuffer Buffer;
{
RwLock::SharedLockScope __(m_IndexLock);
Stopwatch Timer;
@@ -1431,20 +1663,20 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
// We don't need to hold the index lock when we read a standalone file
__.ReleaseNow();
- if (!GetStandaloneCacheValue(Loc, Key, CacheValue))
+ if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer)
{
continue;
}
}
- else if (!GetInlineCacheValue(Loc, CacheValue))
+ else if (Buffer = GetInlineCacheValue(Loc); !Buffer)
{
continue;
}
}
- ZEN_ASSERT(CacheValue.Value);
- ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
- CbObject Obj(SharedBuffer{CacheValue.Value});
+ ZEN_ASSERT(Buffer);
+ ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
+ CbObject Obj(SharedBuffer{Buffer});
Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
}
}
@@ -1571,7 +1803,11 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
if (auto It = Index.find(Key); It != Index.end())
{
- DiskIndexEntry Entry = {.Key = It->first, .Location = m_Payloads[It->second].Location};
+ const BucketPayload& Payload = m_Payloads[It->second];
+ DiskIndexEntry Entry = {.RawSize = Payload.RawSize,
+ .Location = Payload.Location,
+ .RawHash = Payload.RawHash,
+ .Key = It->first};
if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
{
Entry.Location.Flags |= DiskLocation::kTombStone;
@@ -1647,12 +1883,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
continue;
}
- m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
+ m_SlogFile.Append(
+ DiskIndexEntry{.RawSize = Entry.RawSize, .Location = RestoreLocation, .RawHash = Entry.RawHash, .Key = Key});
size_t EntryIndex = m_Payloads.size();
- // TODO: Get from stored index or check payload to get the relevant info
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0u;
- m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation, .RawSize = RawSize, .RawHash = RawHash});
+ m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation, .RawSize = Entry.RawSize, .RawHash = Entry.RawHash});
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert({Key, EntryIndex});
m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed);
@@ -1734,18 +1968,24 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
size_t ChunkIndex = Entry.first;
const BlockStoreLocation& NewLocation = Entry.second;
const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const DiskLocation& OldDiskLocation = m_Payloads[Index[ChunkHash]].Location;
- LogEntries.push_back(
- {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())});
+ const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]];
+ const DiskLocation& OldDiskLocation = OldPayload.Location;
+ LogEntries.push_back({.RawSize = OldPayload.RawSize,
+ .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags()),
+ .RawHash = OldPayload.RawHash,
+ .Key = ChunkHash});
}
for (const size_t ChunkIndex : RemovedChunks)
{
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const DiskLocation& OldDiskLocation = m_Payloads[Index[ChunkHash]].Location;
- LogEntries.push_back({.Key = ChunkHash,
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]];
+ const DiskLocation& OldDiskLocation = OldPayload.Location;
+ LogEntries.push_back({.RawSize = OldPayload.RawSize,
.Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
m_PayloadAlignment,
- OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
+ OldDiskLocation.GetFlags() | DiskLocation::kTombStone),
+ .RawHash = OldPayload.RawHash,
+ .Key = ChunkHash});
DeletedChunks.insert(ChunkHash);
}
@@ -1929,16 +2169,13 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
}
DiskLocation Loc(NewFileSize, EntryFlags);
- // TODO: Get from caller input
- IoHash RawHash = IoHash::Zero;
- size_t RawSize = 0u;
RwLock::ExclusiveLockScope _(m_IndexLock);
if (auto It = m_Index.find(HashKey); It == m_Index.end())
{
// Previously unknown object
size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = RawSize, .RawHash = RawHash});
+ m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert_or_assign(HashKey, EntryIndex);
}
@@ -1947,12 +2184,12 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
// TODO: should check if write is idempotent and bail out if it is?
size_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = RawSize, .RawHash = RawHash};
+ m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash};
m_AccessTimes.emplace_back(GcClock::TickCount());
m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
}
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+ m_SlogFile.Append({.RawSize = Value.RawSize, .Location = Loc, .RawHash = Value.RawHash, .Key = HashKey});
m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
}
@@ -1971,12 +2208,9 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
}
m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
- DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
- const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
- m_SlogFile.Append(DiskIndexEntry);
- // TODO: Get from caller input
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0u;
+ DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
+ m_SlogFile.Append({.RawSize = Value.RawSize, .Location = Location, .RawHash = Value.RawHash, .Key = HashKey});
+
RwLock::ExclusiveLockScope _(m_IndexLock);
if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
@@ -1985,13 +2219,13 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
// content hash to the index entry
size_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = RawSize, .RawHash = RawHash});
+ m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
m_AccessTimes[EntryIndex] = GcClock::TickCount();
}
else
{
size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = RawSize, .RawHash = RawHash});
+ m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert_or_assign(HashKey, EntryIndex);
}
@@ -2605,7 +2839,7 @@ TEST_CASE("z$.size")
for (size_t Key = 0; Key < Count; ++Key)
{
const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer});
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), ZenCacheValue{.Value = Buffer});
}
CacheSize = Zcs.StorageSize();