aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-03 22:23:26 +0200
committerGitHub <[email protected]>2022-05-03 22:23:26 +0200
commitc5b2435192f382fbaa39a8ff67de16ee3b69b7a6 (patch)
tree294bb596d61582744dd7901f6a464c324bdec3d2 /zenserver/cache/structuredcachestore.cpp
parentMerge pull request #84 from EpicGames/de/cleanup-lock-sharding-in-iobuffer (diff)
parentmacos compilation fix (diff)
downloadzen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.tar.xz
zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.zip
Merge pull request #86 from EpicGames/de/block-store-refactor
structured cache with block store
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp1697
1 files changed, 1392 insertions, 305 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index a9f38e51d..3ba75cd9c 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -14,13 +14,13 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/cidstore.h>
+#include <xxhash.h>
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
#endif
@@ -30,10 +30,183 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <random>
+#endif
+
//////////////////////////////////////////////////////////////////////////
namespace zen {
+namespace {
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct CacheBucketIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+ static_assert(sizeof(CacheBucketIndexHeader) == 32);
+
+ struct LegacyDiskLocation
+ {
+ inline LegacyDiskLocation() = default;
+
+ inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
+
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
+ static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
+
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+
+ inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
+ inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
+ inline ZenContentType GetContentType() const
+ {
+ ZenContentType ContentType = ZenContentType::kBinary;
+
+ if (IsFlagSet(LegacyDiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
+ }
+
+ if (IsFlagSet(LegacyDiskLocation::kCompressed))
+ {
+ ContentType = ZenContentType::kCompressedBinary;
+ }
+
+ return ContentType;
+ }
+ inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; }
+
+ private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
+ };
+
+ struct LegacyDiskIndexEntry
+ {
+ IoHash Key;
+ LegacyDiskLocation Location;
+ };
+
+#pragma pack(pop)
+
+ static_assert(sizeof(LegacyDiskIndexEntry) == 36);
+
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
+ const char* LegacyDataExtension = ".sobs";
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + IndexExtension);
+ }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + ".tmp" + IndexExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir)
+ {
+ return BucketDir / (std::string("zen") + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir)
+ {
+ return BucketDir / (std::string("zen") + LegacyDataExtension);
+ }
+
+ bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured |
+ LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+ bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.GetFlags() &
+ ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+} // namespace
+
namespace fs = std::filesystem;
static CbObject
@@ -193,8 +366,8 @@ void
ZenCacheStore::GatherReferences(GcContext& GcCtx)
{
Stopwatch Timer;
- const auto Guard = MakeGuard(
- [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ const auto Guard =
+ MakeGuard([&] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
access_tracking::AccessTimes AccessTimes;
m_MemLayer.GatherAccessTimes(AccessTimes);
@@ -425,6 +598,8 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
+ m_BlocksBasePath = BucketDir / "blocks";
+
CreateDirectories(BucketDir);
std::filesystem::path ManifestPath{BucketDir / "zen_manifest"};
@@ -470,48 +645,465 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
- m_BucketDir = BucketDir;
+ ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
+ m_BucketDir / m_BucketName,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- uint64_t MaxFileOffset = 0;
- uint64_t InvalidEntryCount = 0;
- m_SobsCursor = 0;
- m_TotalSize = 0;
+ namespace fs = std::filesystem;
- m_Index.clear();
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
- std::filesystem::path SobsPath{BucketDir / "zen.sobs"};
- std::filesystem::path SlogPath{BucketDir / "zen.slog"};
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, STmpIndexPath);
+ }
- m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
- m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ try
+ {
+ m_SlogFile.Flush();
- m_SlogFile.Replay(
- [&](const DiskIndexEntry& Entry) {
- if (Entry.Key == IoHash::Zero)
+ // Write the current state of the location map to a new index state
+ uint64_t LogCount = 0;
+ std::vector<DiskIndexEntry> Entries;
+
+ {
+ Entries.resize(m_Index.size());
+
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_Index)
{
- ++InvalidEntryCount;
+ DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = Entry.second.Location;
}
- else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+
+ LogCount = m_SlogFile.GetLogCount();
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(STmpIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadIndexFile()
+{
+ std::vector<DiskIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing #{} entries in {}",
+ m_BucketDir / m_BucketName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
{
- m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::string InvalidEntryReason;
+ for (const DiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+
+ return Header.LogPosition;
}
else
{
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
}
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
- },
- 0);
+ }
+ }
+ return 0;
+}
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
+{
+ std::vector<DiskIndexEntry> Entries;
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ uint64_t ReadCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(ReadCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount()));
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
+ }
+ }
+ }
+ return 0;
+};
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
+{
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
+
+ if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
+ {
+ return 0;
+ }
+
+ ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName);
+
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
+
+ uint64_t MigratedChunkCount = 0;
+ uint32_t MigratedBlockCount = 0;
+ Stopwatch MigrationTimer;
+ uint64_t TotalSize = 0;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
+ m_BucketDir / m_BucketName,
+ MigratedChunkCount,
+ MigratedBlockCount,
+ NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
+ NiceBytes(TotalSize));
+ });
+
+ uint64_t BlockFileSize = 0;
+ {
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+ BlockFileSize = BlockFile.FileSize();
+ }
+
+ std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
+ uint64_t InvalidEntryCount = 0;
+
+ size_t BlockChunkCount = 0;
+ TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog;
+ LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
+ {
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
+ LegacyLogPath,
+ LegacyDiskIndex.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ if (LegacyCasLog.Initialize())
+ {
+ LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
+ LegacyCasLog.Replay(
+ [&](const LegacyDiskIndexEntry& Record) {
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ std::string InvalidEntryReason;
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ if (m_Index.contains(Record.Key))
+ {
+ return;
+ }
+ LegacyDiskIndex[Record.Key] = Record;
+ },
+ 0);
+
+ std::vector<IoHash> BadEntries;
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyDiskIndexEntry& Record(Entry.second);
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize)
+ {
+ BlockChunkCount++;
+ continue;
+ }
+ ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
+ BadEntries.push_back(Entry.first);
+ }
+ for (const IoHash& BadHash : BadEntries)
+ {
+ LegacyDiskIndex.erase(BadHash);
+ }
+ InvalidEntryCount += BadEntries.size();
+ }
+ }
if (InvalidEntryCount)
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath);
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
+ }
+
+ if (LegacyDiskIndex.empty())
+ {
+ LegacyCasLog.Close();
+ if (CleanSource)
+ {
+ // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
+ // a manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ }
+ return 0;
+ }
+
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ CreateDirectories(LogPath.parent_path());
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkIndexToChunkHash.reserve(BlockChunkCount);
+ ChunkLocations.reserve(BlockChunkCount);
+
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount);
+
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const IoHash& ChunkHash = Entry.first;
+ const LegacyDiskLocation& Location = Entry.second.Location;
+ if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ uint8_t Flags = 0xff & (Location.Flags() >> 56);
+ DiskLocation NewLocation = DiskLocation(Location.Size(), Flags);
+ LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
+ continue;
+ }
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()});
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ TotalSize += Location.Size();
+ }
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+ CasLog.Append(LogEntries);
+
+ m_BlockStore.Split(
+ ChunkLocations,
+ LegacyDataPath,
+ m_BlocksBasePath,
+ MaxBlockSize,
+ BlockStoreDiskLocation::MaxBlockIndex + 1,
+ m_PayloadAlignment,
+ CleanSource,
+ [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
+ const BlockStore::MovedChunksArray& MovedChunks) {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& OldLocation = OldEntry.Location;
+ uint8_t Flags = 0xff & (OldLocation.Flags() >> 56);
+ LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)});
+ }
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+ CasLog.Append(LogEntries);
+ CasLog.Flush();
+ if (CleanSource)
+ {
+ std::vector<LegacyDiskIndexEntry> LegacyLogEntries;
+ LegacyLogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& OldLocation = OldEntry.Location;
+ LegacyDiskLocation NewLocation(OldLocation.Offset(),
+ OldLocation.Size(),
+ 0,
+ OldLocation.Flags() | LegacyDiskLocation::kTombStone);
+ LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation});
+ }
+ LegacyCasLog.Append(LegacyLogEntries);
+ LegacyCasLog.Flush();
+ }
+ MigratedBlockCount++;
+ MigratedChunkCount += MovedChunks.size();
+ });
+
+ LegacyCasLog.Close();
+ CasLog.Close();
+
+ if (CleanSource)
+ {
+ // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
+ // a manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ }
+ return MigratedChunkCount;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+{
+ m_BucketDir = BucketDir;
+
+ m_TotalSize = 0;
+
+ m_Index.clear();
+
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+
+ if (IsNew)
+ {
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
+ fs::remove(LegacyLogPath);
+ fs::remove(LegacyDataPath);
+ fs::remove(LogPath);
+ fs::remove(IndexPath);
+ fs::remove_all(m_BlocksBasePath);
}
- m_SobsCursor = (MaxFileOffset + 15) & ~15;
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
+ uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+
+ CreateDirectories(m_BucketDir);
+
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::vector<BlockStoreLocation> KnownLocations;
+ KnownLocations.reserve(m_Index.size());
+ for (const auto& Entry : m_Index)
+ {
+ const DiskLocation& Location = Entry.second.Location;
+ m_TotalSize.fetch_add(Location.Size(), std::memory_order::relaxed);
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
+ KnownLocations.push_back(BlockLocation);
+ }
+
+ m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
+
+ if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ {
+ MakeIndexSnapshot();
+ }
+ // TODO: should validate integrity of container files here
}
void
@@ -532,12 +1124,15 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H
bool
ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue)
{
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
+
+ Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location);
+ if (!ChunkBlock)
{
return false;
}
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size());
+ OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size);
OutValue.Value.SetContentType(Loc.GetContentType());
return true;
@@ -562,23 +1157,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return false;
}
-void
-ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc,
- const IoHash& HashKey,
- const fs::path& Path,
- std::error_code& Ec)
-{
- ZEN_DEBUG("deleting standalone cache file '{}'", Path);
- fs::remove(Path, Ec);
-
- if (!Ec)
- {
- m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}});
- m_Index.erase(HashKey);
- m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
- }
-}
-
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -593,15 +1171,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
{
IndexEntry& Entry = It.value();
Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ DiskLocation Location = Entry.Location;
+ _.ReleaseNow();
- if (GetInlineCacheValue(Entry.Location, OutValue))
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
- return true;
+ return GetStandaloneCacheValue(Location, HashKey, OutValue);
}
-
- _.ReleaseNow();
-
- return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue);
+ return GetInlineCacheValue(Location, OutValue);
}
return false;
@@ -619,54 +1196,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
{
return PutStandaloneCacheValue(HashKey, Value);
}
- else
- {
- // Small object put
-
- uint64_t EntryFlags = 0;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags);
-
- m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16);
-
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
- {
- // Previously unknown object
- m_Index.insert({HashKey, {Loc, GcClock::TickCount()}});
- }
- else
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- IndexEntry& Entry = It.value();
- Entry.Location = Loc;
- Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
- }
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset());
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
- }
+ PutInlineCacheValue(HashKey, Value);
}
void
ZenCacheDiskLayer::CacheBucket::Drop()
{
- // TODO: add error handling
-
- m_SobsFile.Close();
+ m_BlockStore.Close();
m_SlogFile.Close();
DeleteDirectories(m_BucketDir);
}
@@ -674,11 +1210,10 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
- RwLock::SharedLockScope _(m_IndexLock);
-
- m_SobsFile.Flush();
- m_SlogFile.Flush();
+ m_BlockStore.Flush();
+ RwLock::SharedLockScope _(m_IndexLock);
+ MakeIndexSnapshot();
SaveManifest();
}
@@ -724,20 +1259,22 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
ZenCacheValue Value;
- if (GetInlineCacheValue(Loc, Value))
- {
- // Validate contents
- }
- else if (GetStandaloneCacheValue(Loc, HashKey, Value))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- // Note: we cannot currently validate contents since we don't
- // have a content hash!
+ if (GetStandaloneCacheValue(Loc, HashKey, Value))
+ {
+ // Note: we cannot currently validate contents since we don't
+ // have a content hash!
+ continue;
+ }
}
- else
+ else if (GetInlineCacheValue(Loc, Value))
{
- // Value not found
- BadKeys.push_back(HashKey);
+ // Validate contents
+ continue;
}
+ // Value not found
+ BadKeys.push_back(HashKey);
}
}
@@ -754,12 +1291,18 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
// Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- const DiskLocation& Location = It->second.Location;
- m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}});
+ const auto It = m_Index.find(BadKey);
+ DiskLocation Location = It->second.Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location});
m_Index.erase(BadKey);
}
}
+
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCasChunks(BadKeys);
}
void
@@ -767,68 +1310,95 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences");
- Stopwatch Timer;
- const auto Guard = MakeGuard(
- [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs));
+ });
const GcClock::TimePoint ExpireTime =
GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration();
const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
- RwLock::SharedLockScope _(m_IndexLock);
-
- std::vector<IoHash> ValidKeys;
- std::vector<IoHash> ExpiredKeys;
- std::vector<IoHash> Cids;
- std::vector<IndexMap::value_type> Entries(m_Index.begin(), m_Index.end());
-
- std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) {
- return LHS.second.LastAccess < RHS.second.LastAccess;
- });
+ IndexMap Index;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ Index = m_Index;
+ }
- const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) {
- const IndexEntry& Entry = Kv.second;
- return Entry.LastAccess < Ticks;
- });
+ std::vector<IoHash> ExpiredKeys;
+ ExpiredKeys.reserve(1024);
+ std::vector<IoHash> Cids;
Cids.reserve(1024);
- for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv)
+ for (const auto& Entry : Index)
{
- const IoHash& Key = Kv->first;
- const DiskLocation& Loc = Kv->second.Location;
+ const IoHash& Key = Entry.first;
+ if (Entry.second.LastAccess < ExpireTicks)
+ {
+ ExpiredKeys.push_back(Key);
+ continue;
+ }
+
+ const DiskLocation& Loc = Entry.second.Location;
if (Loc.IsFlagSet(DiskLocation::kStructured))
{
- ZenCacheValue CacheValue;
- if (!GetInlineCacheValue(Loc, CacheValue))
+ if (Cids.size() > 1024)
{
- GetStandaloneCacheValue(Loc, Key, CacheValue);
+ GcCtx.ContributeCids(Cids);
+ Cids.clear();
}
- if (CacheValue.Value)
+ ZenCacheValue CacheValue;
{
- ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
- if (Cids.size() > 1024)
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ if (!GetStandaloneCacheValue(Loc, Key, CacheValue))
+ {
+ continue;
+ }
+ }
+ else if (!GetInlineCacheValue(Loc, CacheValue))
{
- GcCtx.ContributeCids(Cids);
- Cids.clear();
+ continue;
}
- CbObject Obj(SharedBuffer{CacheValue.Value});
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
}
+
+ ZEN_ASSERT(CacheValue.Value);
+ ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
+ CbObject Obj(SharedBuffer{CacheValue.Value});
+ Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
}
}
- ValidKeys.reserve(std::distance(ValidIt, Entries.end()));
- ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt));
-
- std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; });
- std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; });
-
GcCtx.ContributeCids(Cids);
- GcCtx.ContributeCacheKeys(m_BucketName, std::move(ValidKeys), std::move(ExpiredKeys));
+ GcCtx.ContributeCacheKeys(m_BucketName, std::move(ExpiredKeys));
}
void
@@ -836,203 +1406,276 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
- Flush();
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- const uint64_t OldCount = m_Index.size();
- const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
-
- ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir);
-
- Stopwatch Timer;
- const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] {
- const uint64_t NewCount = m_Index.size();
- const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed);
- ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})",
- m_BucketDir,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- OldCount - NewCount,
- NiceBytes(OldTotalSize - NewTotalSize),
- OldCount,
- NiceBytes(OldTotalSize));
+ ZEN_INFO("collecting garbage from '{}'", m_BucketDir / m_BucketName);
+
+ Stopwatch TotalTimer;
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ uint64_t DeletedCount = 0;
+ uint64_t MovedCount = 0;
+
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO(
+ "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "entires ({}).",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ RwLock::SharedLockScope _(m_IndexLock);
SaveManifest();
});
- if (m_Index.empty())
+ m_SlogFile.Flush();
+
+ std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName);
+ std::vector<IoHash> DeleteCacheKeys;
+ DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
+ GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
+ if (Keep)
+ {
+ return;
+ }
+ DeleteCacheKeys.push_back(ChunkHash);
+ });
+ if (DeleteCacheKeys.empty())
{
+ ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName);
return;
}
- auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) {
- for (const IoHash& Key : Keys)
+ std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
+ IndexMap Index;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
{
- if (auto It = m_Index.find(Key); It != m_Index.end())
+ if (m_Index.empty())
{
- OutEntries.push_back(*It);
+ ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
+ return;
}
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
}
- };
-
- std::vector<IndexMap::value_type> ValidEntries;
- std::vector<IndexMap::value_type> ExpiredEntries;
-
- AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries);
- AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries);
-
- // Remove all standalone file(s)
- // NOTE: This can probably be made asynchronously
- {
- std::error_code Ec;
- ExtendablePathBuilder<256> Path;
+ SaveManifest();
+ Index = m_Index;
- for (const auto& Entry : ExpiredEntries)
+ for (const IoHash& Key : DeleteCacheKeys)
{
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (auto It = Index.find(Key); It != Index.end())
{
- Path.Reset();
- BuildPath(Path, Key);
-
- // NOTE: this will update index and log file
- DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec);
-
- if (Ec)
+ DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location};
+ if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
{
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message());
- Ec.clear();
+ Entry.Location.Flags |= DiskLocation::kTombStone;
+ ExpiredStandaloneEntries.push_back(Entry);
}
}
}
+ if (GcCtx.IsDeletionMode())
+ {
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(ExpiredStandaloneEntries);
+ }
}
- if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty())
+ if (GcCtx.IsDeletionMode())
{
- // Naive GC implementation of small objects. Needs enough free
- // disk space to store intermediate sob container along side the
- // old container
-
- const auto ResetSobStorage = [this, &ValidEntries]() {
- m_SobsFile.Close();
- m_SlogFile.Close();
+ std::error_code Ec;
+ ExtendablePathBuilder<256> Path;
- const bool IsNew = true;
- m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
- m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ const IoHash& Key = Entry.Key;
+ const DiskLocation& Loc = Entry.Location;
- m_SobsCursor = 0;
- m_TotalSize = 0;
- m_Index.clear();
+ Path.Reset();
+ BuildPath(Path, Key);
- for (const auto& Entry : ValidEntries)
{
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
{
- m_SlogFile.Append({.Key = Key, .Location = Loc});
- m_Index.insert({Key, {Loc, GcClock::TickCount()}});
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
+ continue;
}
+ ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ fs::remove(Path.c_str(), Ec);
}
- };
-
- uint64_t NewContainerSize{};
- for (const auto& Entry : ValidEntries)
- {
- const DiskLocation& Loc = Entry.second.Location;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false)
+ if (Ec)
{
- NewContainerSize += (Loc.Size() + sizeof(DiskLocation));
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
+ Ec.clear();
+ DiskLocation RestoreLocation = Loc;
+ RestoreLocation.Flags &= ~DiskLocation::kTombStone;
+
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
+ {
+ continue;
+ }
+ m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
+ m_Index.insert({Key, {Loc, GcClock::TickCount()}});
+ m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ continue;
}
+ m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ DeletedSize += Entry.Location.Size();
+ DeletedCount++;
}
+ }
- if (NewContainerSize == 0)
- {
- ResetSobStorage();
- return;
- }
+ TotalChunkCount = Index.size();
- const uint64_t DiskSpaceMargin = (256 << 10);
+ std::vector<IoHash> TotalChunkHashes;
+ TotalChunkHashes.reserve(TotalChunkCount);
+ for (const auto& Entry : Index)
+ {
+ const DiskLocation& Location = Entry.second.Location;
- std::error_code Ec;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec);
- if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin)
+ if (Location.Flags & DiskLocation::kStandaloneFile)
{
- ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)",
- m_BucketDir,
- NiceBytes(NewContainerSize),
- NiceBytes(Space.Free));
- return;
+ continue;
}
+ TotalChunkHashes.push_back(Entry.first);
+ }
- std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"};
- std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"};
-
- // Copy non expired sob(s) to temporary sob container
-
+ if (TotalChunkHashes.empty())
+ {
+ return;
+ }
+ TotalChunkCount = TotalChunkHashes.size();
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ BlockStore::ChunkIndexArray KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
+
+ GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ auto KeyIt = Index.find(ChunkHash);
+ const DiskLocation& DiskLocation = KeyIt->second.Location;
+ BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ if (Keep)
{
- BasicFile TmpSobs;
- TCasLogFile<DiskIndexEntry> TmpLog;
- uint64_t TmpCursor{};
- std::vector<uint8_t> Chunk;
+ KeepChunkIndexes.push_back(ChunkIndex);
+ }
+ });
- TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate);
- TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate);
+ size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
- for (const auto& Entry : ValidEntries)
- {
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_BucketDir / m_BucketName,
+ DeleteCount,
+ 0, // NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
+ return;
+ }
- DiskLocation NewLoc;
+ std::vector<IoHash> DeletedChunks;
+ m_BlockStore.ReclaimSpace(
+ BlockStoreState,
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_PayloadAlignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const DiskLocation& OldDiskLocation = Index[ChunkHash].Location;
+ LogEntries.push_back(
+ {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const DiskLocation& OldDiskLocation = Index[ChunkHash].Location;
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
+ m_PayloadAlignment,
+ OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
+ DeletedChunks.push_back(ChunkHash);
+ }
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags());
- }
- else
+ m_SlogFile.Append(LogEntries);
+ m_SlogFile.Flush();
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ for (const DiskIndexEntry& Entry : LogEntries)
{
- Chunk.resize(Loc.Size());
- m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset());
-
- NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags());
- TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor);
- TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16);
+ if (Entry.Location.GetFlags() & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Entry.Key);
+ uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size;
+ m_TotalSize.fetch_sub(ChunkSize);
+ continue;
+ }
+ m_Index[Entry.Key].Location = Entry.Location;
}
-
- TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc});
}
- }
-
- // Swap state
- try
- {
- fs::path SobsPath{m_BucketDir / "zen.sobs"};
- fs::path SlogPath{m_BucketDir / "zen.slog"};
-
- m_SobsFile.Close();
- m_SlogFile.Close();
-
- fs::remove(SobsPath);
- fs::remove(SlogPath);
-
- fs::rename(TmpSobsPath, SobsPath);
- fs::rename(TmpSlogPath, SlogPath);
+ },
+ [&]() { return GcCtx.CollectSmallObjects(); });
- const bool IsNew = false;
- OpenLog(m_BucketDir, IsNew);
- }
- catch (std::exception& Err)
- {
- ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what());
- ResetSobStorage();
- }
- }
+ GcCtx.DeletedCas(DeletedChunks);
}
void
@@ -1144,16 +1787,20 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
// Update index
- uint64_t EntryFlags = DiskLocation::kStandaloneFile;
+ uint8_t EntryFlags = DiskLocation::kStandaloneFile;
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
{
EntryFlags |= DiskLocation::kStructured;
}
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
+ DiskLocation Loc(Value.Value.Size(), EntryFlags);
IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
if (auto It = m_Index.find(HashKey); It == m_Index.end())
@@ -1171,6 +1818,41 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
}
+void
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ uint8_t EntryFlags = 0;
+
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
+
+ BlockStoreLocation BlockStoreLocation = m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment);
+ DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
+ const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
+ m_SlogFile.Append(DiskIndexEntry);
+ m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed);
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
+ IndexEntry& Entry = It.value();
+ Entry.Location = Location;
+ Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ }
+ else
+ {
+ m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir)
@@ -1363,6 +2045,7 @@ void
ZenCacheDiskLayer::Flush()
{
std::vector<CacheBucket*> Buckets;
+
{
RwLock::SharedLockScope _(m_Lock);
Buckets.reserve(m_Buckets.size());
@@ -1417,6 +2100,9 @@ ZenCacheDiskLayer::TotalSize() const
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
+}
+
+namespace zen {
using namespace std::literals;
@@ -1425,10 +2111,18 @@ namespace testutils {
IoBuffer CreateBinaryCacheValue(uint64_t Size)
{
- std::vector<uint32_t> Data(size_t(Size / sizeof(uint32_t)));
- std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; });
+ static std::random_device rd;
+ static std::mt19937 g(rd());
- IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t));
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size());
Buf.SetContentType(ZenContentType::kBinary);
return Buf;
};
@@ -1735,6 +2429,7 @@ TEST_CASE("z$.gc")
GcCtx.MaxCacheDuration(std::chrono::minutes(2));
GcCtx.CollectSmallObjects(true);
+ Zcs.Flush();
Gc.CollectGarbage(GcCtx);
for (const auto& Key : Keys)
@@ -1749,6 +2444,398 @@ TEST_CASE("z$.gc")
}
}
+TEST_CASE("z$.legacyconversion")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[] = {2041,
+ 1123,
+ 1223,
+ 1239,
+ 341,
+ 1412,
+ 912,
+ 774,
+ 341,
+ 431,
+ 554,
+ 1098,
+ 2048,
+ 339 + 64 * 1024,
+ 561 + 64 * 1024,
+ 16 + 64 * 1024,
+ 16 + 64 * 1024,
+ 2048,
+ 2048};
+ size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
+ size_t SingleBlockSize = 0;
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(ChunkCount);
+ for (uint64_t Size : ChunkSizes)
+ {
+ Chunks.push_back(testutils::CreateBinaryCacheValue(Size));
+ SingleBlockSize += Size;
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunkCount);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ const std::string Bucket = "rightintwo";
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path());
+ const GcClock::TimePoint CurrentTime = GcClock::Now();
+
+ for (size_t i = 0; i < ChunkCount; i++)
+ {
+ Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]});
+ }
+
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcContext GcCtx(CurrentTime + std::chrono::hours(2));
+ GcCtx.MaxCacheDuration(std::chrono::minutes(2));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepChunks);
+ Zcs.Flush();
+ Gc.CollectGarbage(GcCtx);
+ }
+ std::filesystem::path BucketDir = TempDir.Path() / Bucket;
+ std::filesystem::path BlocksBaseDir = BucketDir / "blocks";
+
+ std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1);
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir);
+ std::filesystem::remove(LegacyDataPath);
+ std::filesystem::rename(CasPath, LegacyDataPath);
+
+ std::vector<DiskIndexEntry> LogEntries;
+ std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion &&
+ Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
+ {
+ LogEntries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ }
+ }
+ ObjectIndexFile.Close();
+ std::filesystem::remove(IndexPath);
+ }
+
+ std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket);
+ {
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ LogEntries.reserve(CasLog.GetLogCount());
+ CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
+ }
+ TCasLogFile<LegacyDiskIndexEntry> LegacyLog;
+ std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir);
+ LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
+
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ uint64_t Size;
+ uint64_t Offset;
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ Size = Entry.Location.Location.StandaloneSize;
+ Offset = 0;
+ }
+ else
+ {
+ BlockStoreLocation Location = Entry.Location.GetBlockLocation(16);
+ Size = Location.Size;
+ Offset = Location.Offset;
+ }
+ LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56);
+ LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation};
+ LegacyLog.Append(LegacyEntry);
+ }
+ LegacyLog.Close();
+
+ std::filesystem::remove_all(BlocksBaseDir);
+ std::filesystem::remove(LogPath);
+ std::filesystem::remove(IndexPath);
+
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path());
+
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ ZenCacheValue Value;
+ CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value));
+ CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value));
+ }
+ }
+}
+
+TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ struct Chunk
+ {
+ std::string Bucket;
+ IoBuffer Buffer;
+ };
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ const std::string Bucket1 = "rightinone";
+ const std::string Bucket2 = "rightintwo";
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ while (true)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ break;
+ }
+ while (true)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ break;
+ }
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ WorkerThreadPool ThreadPool(4);
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path());
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+
+ const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * Chunks.size(), TotalSize);
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
+
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+ std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ {
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ }
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ }
+ }
+
+ std::atomic<size_t> WorkCompleted = 0;
+ std::atomic_uint32_t AddedChunkCount = 0;
+ for (const auto& Chunk : NewChunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (AddedChunkCount.load() < NewChunks.size())
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < GcChunkHashes.size())
+ {
+ Sleep(1);
+ }
+ }
+ }
+}
+
#endif
void