aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2022-05-20 12:42:56 +0200
committerStefan Boberg <[email protected]>2022-05-20 12:42:56 +0200
commit5b271be0169b842cdc3d576e48bf0ddc2f122852 (patch)
tree16f501d2190f19a7281ce3f30365817464e146cb /zenserver/cache/structuredcachestore.cpp
parentAdded ZEN_USE_CATCH2 define (diff)
parentfix mac compilation error (diff)
downloadzen-5b271be0169b842cdc3d576e48bf0ddc2f122852.tar.xz
zen-5b271be0169b842cdc3d576e48bf0ddc2f122852.zip
Merge branch 'main' into use-catch2
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp2237
1 files changed, 1811 insertions, 426 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 738e4c1fd..da948fd72 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -14,13 +14,13 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/cidstore.h>
+#include <xxhash.h>
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
#endif
@@ -30,10 +30,183 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <random>
+#endif
+
//////////////////////////////////////////////////////////////////////////
namespace zen {
+namespace {
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct CacheBucketIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+ static_assert(sizeof(CacheBucketIndexHeader) == 32);
+
+ struct LegacyDiskLocation
+ {
+ inline LegacyDiskLocation() = default;
+
+ inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
+
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
+ static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
+
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+
+ inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
+ inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
+ inline ZenContentType GetContentType() const
+ {
+ ZenContentType ContentType = ZenContentType::kBinary;
+
+ if (IsFlagSet(LegacyDiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
+ }
+
+ if (IsFlagSet(LegacyDiskLocation::kCompressed))
+ {
+ ContentType = ZenContentType::kCompressedBinary;
+ }
+
+ return ContentType;
+ }
+ inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; }
+
+ private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
+ };
+
+ struct LegacyDiskIndexEntry
+ {
+ IoHash Key;
+ LegacyDiskLocation Location;
+ };
+
+#pragma pack(pop)
+
+ static_assert(sizeof(LegacyDiskIndexEntry) == 36);
+
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
+ const char* LegacyDataExtension = ".sobs";
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + IndexExtension);
+ }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + ".tmp" + IndexExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir)
+ {
+ return BucketDir / (std::string("zen") + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir)
+ {
+ return BucketDir / (std::string("zen") + LegacyDataExtension);
+ }
+
+ bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured |
+ LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+ bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.GetFlags() &
+ ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+} // namespace
+
namespace fs = std::filesystem;
static CbObject
@@ -59,7 +232,7 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
WriteFile(Path, Object.GetBuffer().AsIoBuffer());
}
-ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir)
+ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir)
: GcStorage(Gc)
, GcContributor(Gc)
, m_RootDir(RootDir)
@@ -75,12 +248,12 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir)
#endif
}
-ZenCacheStore::~ZenCacheStore()
+ZenCacheNamespace::~ZenCacheNamespace()
{
}
bool
-ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
ZEN_TRACE_CPU("Z$::Get");
@@ -118,7 +291,7 @@ ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheVal
}
void
-ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
{
ZEN_TRACE_CPU("Z$::Put");
@@ -154,7 +327,7 @@ ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCa
}
bool
-ZenCacheStore::DropBucket(std::string_view Bucket)
+ZenCacheNamespace::DropBucket(std::string_view Bucket)
{
ZEN_INFO("dropping bucket '{}'", Bucket);
@@ -170,13 +343,13 @@ ZenCacheStore::DropBucket(std::string_view Bucket)
}
void
-ZenCacheStore::Flush()
+ZenCacheNamespace::Flush()
{
m_DiskLayer.Flush();
}
void
-ZenCacheStore::Scrub(ScrubContext& Ctx)
+ZenCacheNamespace::Scrub(ScrubContext& Ctx)
{
if (m_LastScrubTime == Ctx.ScrubTimestamp())
{
@@ -190,11 +363,11 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheStore::GatherReferences(GcContext& GcCtx)
+ZenCacheNamespace::GatherReferences(GcContext& GcCtx)
{
Stopwatch Timer;
- const auto Guard = MakeGuard(
- [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ const auto Guard =
+ MakeGuard([&] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
access_tracking::AccessTimes AccessTimes;
m_MemLayer.GatherAccessTimes(AccessTimes);
@@ -204,14 +377,14 @@ ZenCacheStore::GatherReferences(GcContext& GcCtx)
}
void
-ZenCacheStore::CollectGarbage(GcContext& GcCtx)
+ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
{
m_MemLayer.Reset();
m_DiskLayer.CollectGarbage(GcCtx);
}
GcStorageSize
-ZenCacheStore::StorageSize() const
+ZenCacheNamespace::StorageSize() const
{
return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()};
}
@@ -425,6 +598,8 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
+ m_BlocksBasePath = BucketDir / "blocks";
+
CreateDirectories(BucketDir);
std::filesystem::path ManifestPath{BucketDir / "zen_manifest"};
@@ -470,48 +645,465 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
- m_BucketDir = BucketDir;
+ ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
+ m_BucketDir / m_BucketName,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- uint64_t MaxFileOffset = 0;
- uint64_t InvalidEntryCount = 0;
- m_SobsCursor = 0;
- m_TotalSize = 0;
+ namespace fs = std::filesystem;
- m_Index.clear();
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
+
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, STmpIndexPath);
+ }
+
+ try
+ {
+ m_SlogFile.Flush();
- std::filesystem::path SobsPath{BucketDir / "zen.sobs"};
- std::filesystem::path SlogPath{BucketDir / "zen.slog"};
+ // Write the current state of the location map to a new index state
+ uint64_t LogCount = 0;
+ std::vector<DiskIndexEntry> Entries;
- m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
- m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ {
+ Entries.resize(m_Index.size());
- m_SlogFile.Replay(
- [&](const DiskIndexEntry& Entry) {
- if (Entry.Key == IoHash::Zero)
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_Index)
{
- ++InvalidEntryCount;
+ DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = Entry.second.Location;
}
- else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+
+ LogCount = m_SlogFile.GetLogCount();
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(STmpIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadIndexFile()
+{
+ std::vector<DiskIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing #{} entries in {}",
+ m_BucketDir / m_BucketName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
{
- m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::string InvalidEntryReason;
+ for (const DiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+
+ return Header.LogPosition;
}
else
{
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
}
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
- },
- 0);
+ }
+ }
+ return 0;
+}
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
+{
+ std::vector<DiskIndexEntry> Entries;
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ uint64_t ReadCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(ReadCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount()));
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
+ }
+ }
+ }
+ return 0;
+};
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
+{
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
+
+ if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
+ {
+ return 0;
+ }
+
+ ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName);
+
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
+
+ uint64_t MigratedChunkCount = 0;
+ uint32_t MigratedBlockCount = 0;
+ Stopwatch MigrationTimer;
+ uint64_t TotalSize = 0;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
+ m_BucketDir / m_BucketName,
+ MigratedChunkCount,
+ MigratedBlockCount,
+ NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
+ NiceBytes(TotalSize));
+ });
+
+ uint64_t BlockFileSize = 0;
+ {
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+ BlockFileSize = BlockFile.FileSize();
+ }
+
+ std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
+ uint64_t InvalidEntryCount = 0;
+
+ size_t BlockChunkCount = 0;
+ TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog;
+ LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
+ {
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
+ LegacyLogPath,
+ LegacyDiskIndex.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ if (LegacyCasLog.Initialize())
+ {
+ LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
+ LegacyCasLog.Replay(
+ [&](const LegacyDiskIndexEntry& Record) {
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ std::string InvalidEntryReason;
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ if (m_Index.contains(Record.Key))
+ {
+ return;
+ }
+ LegacyDiskIndex[Record.Key] = Record;
+ },
+ 0);
+
+ std::vector<IoHash> BadEntries;
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyDiskIndexEntry& Record(Entry.second);
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize)
+ {
+ BlockChunkCount++;
+ continue;
+ }
+ ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
+ BadEntries.push_back(Entry.first);
+ }
+ for (const IoHash& BadHash : BadEntries)
+ {
+ LegacyDiskIndex.erase(BadHash);
+ }
+ InvalidEntryCount += BadEntries.size();
+ }
+ }
if (InvalidEntryCount)
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath);
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
}
- m_SobsCursor = (MaxFileOffset + 15) & ~15;
+ if (LegacyDiskIndex.empty())
+ {
+ LegacyCasLog.Close();
+ if (CleanSource)
+ {
+ // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
+ // a manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ }
+ return 0;
+ }
+
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ CreateDirectories(LogPath.parent_path());
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkIndexToChunkHash.reserve(BlockChunkCount);
+ ChunkLocations.reserve(BlockChunkCount);
+
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount);
+
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const IoHash& ChunkHash = Entry.first;
+ const LegacyDiskLocation& Location = Entry.second.Location;
+ if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ uint8_t Flags = 0xff & (Location.Flags() >> 56);
+ DiskLocation NewLocation = DiskLocation(Location.Size(), Flags);
+ LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
+ continue;
+ }
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()});
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ TotalSize += Location.Size();
+ }
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+ CasLog.Append(LogEntries);
+
+ m_BlockStore.Split(
+ ChunkLocations,
+ LegacyDataPath,
+ m_BlocksBasePath,
+ MaxBlockSize,
+ BlockStoreDiskLocation::MaxBlockIndex + 1,
+ m_PayloadAlignment,
+ CleanSource,
+ [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
+ const BlockStore::MovedChunksArray& MovedChunks) {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& OldLocation = OldEntry.Location;
+ uint8_t Flags = 0xff & (OldLocation.Flags() >> 56);
+ LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)});
+ }
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+ CasLog.Append(LogEntries);
+ CasLog.Flush();
+ if (CleanSource)
+ {
+ std::vector<LegacyDiskIndexEntry> LegacyLogEntries;
+ LegacyLogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& OldLocation = OldEntry.Location;
+ LegacyDiskLocation NewLocation(OldLocation.Offset(),
+ OldLocation.Size(),
+ 0,
+ OldLocation.Flags() | LegacyDiskLocation::kTombStone);
+ LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation});
+ }
+ LegacyCasLog.Append(LegacyLogEntries);
+ LegacyCasLog.Flush();
+ }
+ MigratedBlockCount++;
+ MigratedChunkCount += MovedChunks.size();
+ });
+
+ LegacyCasLog.Close();
+ CasLog.Close();
+
+ if (CleanSource)
+ {
+ // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
+ // a manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ }
+ return MigratedChunkCount;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+{
+ m_BucketDir = BucketDir;
+
+ m_TotalSize = 0;
+
+ m_Index.clear();
+
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+
+ if (IsNew)
+ {
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
+ fs::remove(LegacyLogPath);
+ fs::remove(LegacyDataPath);
+ fs::remove(LogPath);
+ fs::remove(IndexPath);
+ fs::remove_all(m_BlocksBasePath);
+ }
+
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
+ uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+
+ CreateDirectories(m_BucketDir);
+
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::vector<BlockStoreLocation> KnownLocations;
+ KnownLocations.reserve(m_Index.size());
+ for (const auto& Entry : m_Index)
+ {
+ const DiskLocation& Location = Entry.second.Location;
+ m_TotalSize.fetch_add(Location.Size(), std::memory_order::relaxed);
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
+ KnownLocations.push_back(BlockLocation);
+ }
+
+ m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
+
+ if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ {
+ MakeIndexSnapshot();
+ }
+ // TODO: should validate integrity of container files here
}
void
@@ -532,12 +1124,13 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H
bool
ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue)
{
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
+
+ OutValue.Value = m_BlockStore.TryGetChunk(Location);
+ if (!OutValue.Value)
{
return false;
}
-
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size());
OutValue.Value.SetContentType(Loc.GetContentType());
return true;
@@ -562,23 +1155,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return false;
}
-void
-ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc,
- const IoHash& HashKey,
- const fs::path& Path,
- std::error_code& Ec)
-{
- ZEN_DEBUG("deleting standalone cache file '{}'", Path);
- fs::remove(Path, Ec);
-
- if (!Ec)
- {
- m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}});
- m_Index.erase(HashKey);
- m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
- }
-}
-
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -588,23 +1164,21 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
RwLock::SharedLockScope _(m_IndexLock);
-
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ auto It = m_Index.find(HashKey);
+ if (It == m_Index.end())
{
- IndexEntry& Entry = It.value();
- Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
-
- if (GetInlineCacheValue(Entry.Location, OutValue))
- {
- return true;
- }
-
+ return false;
+ }
+ IndexEntry& Entry = It.value();
+ Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ DiskLocation Location = Entry.Location;
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ // We don't need to hold the index lock when we read a standalone file
_.ReleaseNow();
-
- return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue);
+ return GetStandaloneCacheValue(Location, HashKey, OutValue);
}
-
- return false;
+ return GetInlineCacheValue(Location, OutValue);
}
void
@@ -619,54 +1193,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
{
return PutStandaloneCacheValue(HashKey, Value);
}
- else
- {
- // Small object put
-
- uint64_t EntryFlags = 0;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags);
-
- m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16);
-
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
- {
- // Previously unknown object
- m_Index.insert({HashKey, {Loc, GcClock::TickCount()}});
- }
- else
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- IndexEntry& Entry = It.value();
- Entry.Location = Loc;
- Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
- }
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset());
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
- }
+ PutInlineCacheValue(HashKey, Value);
}
void
ZenCacheDiskLayer::CacheBucket::Drop()
{
- // TODO: add error handling
-
- m_SobsFile.Close();
+ m_BlockStore.Close();
m_SlogFile.Close();
DeleteDirectories(m_BucketDir);
}
@@ -674,11 +1207,10 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
- RwLock::SharedLockScope _(m_IndexLock);
-
- m_SobsFile.Flush();
- m_SlogFile.Flush();
+ m_BlockStore.Flush();
+ RwLock::SharedLockScope _(m_IndexLock);
+ MakeIndexSnapshot();
SaveManifest();
}
@@ -724,20 +1256,22 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
ZenCacheValue Value;
- if (GetInlineCacheValue(Loc, Value))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- // Validate contents
+ if (GetStandaloneCacheValue(Loc, HashKey, Value))
+ {
+ // Note: we cannot currently validate contents since we don't
+ // have a content hash!
+ continue;
+ }
}
- else if (GetStandaloneCacheValue(Loc, HashKey, Value))
+ else if (GetInlineCacheValue(Loc, Value))
{
- // Note: we cannot currently validate contents since we don't
- // have a content hash!
- }
- else
- {
- // Value not found
- BadKeys.push_back(HashKey);
+ // Validate contents
+ continue;
}
+ // Value not found
+ BadKeys.push_back(HashKey);
}
}
@@ -754,12 +1288,18 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
// Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- const DiskLocation& Location = It->second.Location;
- m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}});
+ const auto It = m_Index.find(BadKey);
+ DiskLocation Location = It->second.Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location});
m_Index.erase(BadKey);
}
}
+
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCasChunks(BadKeys);
}
void
@@ -767,68 +1307,95 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences");
- Stopwatch Timer;
- const auto Guard = MakeGuard(
- [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs));
+ });
const GcClock::TimePoint ExpireTime =
GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration();
const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
- RwLock::SharedLockScope _(m_IndexLock);
-
- std::vector<IoHash> ValidKeys;
- std::vector<IoHash> ExpiredKeys;
- std::vector<IoHash> Cids;
- std::vector<IndexMap::value_type> Entries(m_Index.begin(), m_Index.end());
-
- std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) {
- return LHS.second.LastAccess < RHS.second.LastAccess;
- });
+ IndexMap Index;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ Index = m_Index;
+ }
- const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) {
- const IndexEntry& Entry = Kv.second;
- return Entry.LastAccess < Ticks;
- });
+ std::vector<IoHash> ExpiredKeys;
+ ExpiredKeys.reserve(1024);
+ std::vector<IoHash> Cids;
Cids.reserve(1024);
- for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv)
+ for (const auto& Entry : Index)
{
- const IoHash& Key = Kv->first;
- const DiskLocation& Loc = Kv->second.Location;
+ const IoHash& Key = Entry.first;
+ if (Entry.second.LastAccess < ExpireTicks)
+ {
+ ExpiredKeys.push_back(Key);
+ continue;
+ }
+
+ const DiskLocation& Loc = Entry.second.Location;
if (Loc.IsFlagSet(DiskLocation::kStructured))
{
- ZenCacheValue CacheValue;
- if (!GetInlineCacheValue(Loc, CacheValue))
+ if (Cids.size() > 1024)
{
- GetStandaloneCacheValue(Loc, Key, CacheValue);
+ GcCtx.ContributeCids(Cids);
+ Cids.clear();
}
- if (CacheValue.Value)
+ ZenCacheValue CacheValue;
{
- ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
- if (Cids.size() > 1024)
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- GcCtx.ContributeCids(Cids);
- Cids.clear();
+ if (!GetStandaloneCacheValue(Loc, Key, CacheValue))
+ {
+ continue;
+ }
+ }
+ else if (!GetInlineCacheValue(Loc, CacheValue))
+ {
+ continue;
}
- CbObject Obj(SharedBuffer{CacheValue.Value});
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
}
+
+ ZEN_ASSERT(CacheValue.Value);
+ ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
+ CbObject Obj(SharedBuffer{CacheValue.Value});
+ Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
}
}
- ValidKeys.reserve(std::distance(ValidIt, Entries.end()));
- ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt));
-
- std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; });
- std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; });
-
GcCtx.ContributeCids(Cids);
- GcCtx.ContributeCacheKeys(m_BucketName, std::move(ValidKeys), std::move(ExpiredKeys));
+ GcCtx.ContributeCacheKeys(m_BucketName, std::move(ExpiredKeys));
}
void
@@ -836,203 +1403,282 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
- Flush();
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- const uint64_t OldCount = m_Index.size();
- const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
-
- ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir);
-
- Stopwatch Timer;
- const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] {
- const uint64_t NewCount = m_Index.size();
- const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed);
- ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})",
- m_BucketDir,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- OldCount - NewCount,
- NiceBytes(OldTotalSize - NewTotalSize),
- OldCount,
- NiceBytes(OldTotalSize));
+ ZEN_INFO("collecting garbage from '{}'", m_BucketDir / m_BucketName);
+
+ Stopwatch TotalTimer;
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ uint64_t DeletedCount = 0;
+ uint64_t MovedCount = 0;
+
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO(
+ "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "entires ({}).",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ RwLock::SharedLockScope _(m_IndexLock);
SaveManifest();
});
- if (m_Index.empty())
+ m_SlogFile.Flush();
+
+ std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName);
+ std::vector<IoHash> DeleteCacheKeys;
+ DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
+ GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
+ if (Keep)
+ {
+ return;
+ }
+ DeleteCacheKeys.push_back(ChunkHash);
+ });
+ if (DeleteCacheKeys.empty())
{
+ ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName);
return;
}
- auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) {
- for (const IoHash& Key : Keys)
+ std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
+ IndexMap Index;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.empty())
{
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- OutEntries.push_back(*It);
- }
+ ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
+ return;
}
- };
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
- std::vector<IndexMap::value_type> ValidEntries;
- std::vector<IndexMap::value_type> ExpiredEntries;
-
- AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries);
- AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries);
-
- // Remove all standalone file(s)
- // NOTE: This can probably be made asynchronously
- {
- std::error_code Ec;
- ExtendablePathBuilder<256> Path;
+ SaveManifest();
+ Index = m_Index;
- for (const auto& Entry : ExpiredEntries)
+ for (const IoHash& Key : DeleteCacheKeys)
{
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (auto It = Index.find(Key); It != Index.end())
{
- Path.Reset();
- BuildPath(Path, Key);
-
- // NOTE: this will update index and log file
- DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec);
-
- if (Ec)
+ DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location};
+ if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
{
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message());
- Ec.clear();
+ Entry.Location.Flags |= DiskLocation::kTombStone;
+ ExpiredStandaloneEntries.push_back(Entry);
}
}
}
+ if (GcCtx.IsDeletionMode())
+ {
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(ExpiredStandaloneEntries);
+ }
}
- if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty())
+ if (GcCtx.IsDeletionMode())
{
- // Naive GC implementation of small objects. Needs enough free
- // disk space to store intermediate sob container along side the
- // old container
-
- const auto ResetSobStorage = [this, &ValidEntries]() {
- m_SobsFile.Close();
- m_SlogFile.Close();
+ std::error_code Ec;
+ ExtendablePathBuilder<256> Path;
- const bool IsNew = true;
- m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
- m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ const IoHash& Key = Entry.Key;
+ const DiskLocation& Loc = Entry.Location;
- m_SobsCursor = 0;
- m_TotalSize = 0;
- m_Index.clear();
+ Path.Reset();
+ BuildPath(Path, Key);
+ fs::path FilePath = Path.ToPath();
- for (const auto& Entry : ValidEntries)
{
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
+ continue;
+ }
+ __.ReleaseNow();
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
+ if (fs::is_regular_file(FilePath))
{
- m_SlogFile.Append({.Key = Key, .Location = Loc});
- m_Index.insert({Key, {Loc, GcClock::TickCount()}});
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ fs::remove(FilePath, Ec);
}
}
- };
- uint64_t NewContainerSize{};
- for (const auto& Entry : ValidEntries)
- {
- const DiskLocation& Loc = Entry.second.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false)
+ if (Ec)
{
- NewContainerSize += (Loc.Size() + sizeof(DiskLocation));
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
+ Ec.clear();
+ DiskLocation RestoreLocation = Loc;
+ RestoreLocation.Flags &= ~DiskLocation::kTombStone;
+
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
+ {
+ continue;
+ }
+ m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
+ m_Index.insert({Key, {Loc, GcClock::TickCount()}});
+ m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ continue;
}
+ m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ DeletedSize += Entry.Location.Size();
+ DeletedCount++;
}
+ }
- if (NewContainerSize == 0)
- {
- ResetSobStorage();
- return;
- }
+ TotalChunkCount = Index.size();
- const uint64_t DiskSpaceMargin = (256 << 10);
+ std::vector<IoHash> TotalChunkHashes;
+ TotalChunkHashes.reserve(TotalChunkCount);
+ for (const auto& Entry : Index)
+ {
+ const DiskLocation& Location = Entry.second.Location;
- std::error_code Ec;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec);
- if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin)
+ if (Location.Flags & DiskLocation::kStandaloneFile)
{
- ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)",
- m_BucketDir,
- NiceBytes(NewContainerSize),
- NiceBytes(Space.Free));
- return;
+ continue;
}
+ TotalChunkHashes.push_back(Entry.first);
+ }
- std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"};
- std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"};
-
- // Copy non expired sob(s) to temporary sob container
-
+ if (TotalChunkHashes.empty())
+ {
+ return;
+ }
+ TotalChunkCount = TotalChunkHashes.size();
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ BlockStore::ChunkIndexArray KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
+
+ GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ auto KeyIt = Index.find(ChunkHash);
+ const DiskLocation& DiskLocation = KeyIt->second.Location;
+ BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ if (Keep)
{
- BasicFile TmpSobs;
- TCasLogFile<DiskIndexEntry> TmpLog;
- uint64_t TmpCursor{};
- std::vector<uint8_t> Chunk;
+ KeepChunkIndexes.push_back(ChunkIndex);
+ }
+ });
- TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate);
- TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate);
+ size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
- for (const auto& Entry : ValidEntries)
- {
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_BucketDir / m_BucketName,
+ DeleteCount,
+ 0, // NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
+ return;
+ }
- DiskLocation NewLoc;
+ std::vector<IoHash> DeletedChunks;
+ m_BlockStore.ReclaimSpace(
+ BlockStoreState,
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_PayloadAlignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const DiskLocation& OldDiskLocation = Index[ChunkHash].Location;
+ LogEntries.push_back(
+ {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const DiskLocation& OldDiskLocation = Index[ChunkHash].Location;
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
+ m_PayloadAlignment,
+ OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
+ DeletedChunks.push_back(ChunkHash);
+ }
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags());
- }
- else
+ m_SlogFile.Append(LogEntries);
+ m_SlogFile.Flush();
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ for (const DiskIndexEntry& Entry : LogEntries)
{
- Chunk.resize(Loc.Size());
- m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset());
-
- NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags());
- TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor);
- TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16);
+ if (Entry.Location.GetFlags() & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Entry.Key);
+ uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size;
+ m_TotalSize.fetch_sub(ChunkSize);
+ continue;
+ }
+ m_Index[Entry.Key].Location = Entry.Location;
}
-
- TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc});
}
- }
-
- // Swap state
- try
- {
- fs::path SobsPath{m_BucketDir / "zen.sobs"};
- fs::path SlogPath{m_BucketDir / "zen.slog"};
-
- m_SobsFile.Close();
- m_SlogFile.Close();
-
- fs::remove(SobsPath);
- fs::remove(SlogPath);
-
- fs::rename(TmpSobsPath, SobsPath);
- fs::rename(TmpSlogPath, SlogPath);
+ },
+ [&]() { return GcCtx.CollectSmallObjects(); });
- const bool IsNew = false;
- OpenLog(m_BucketDir, IsNew);
- }
- catch (std::exception& Err)
- {
- ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what());
- ResetSobStorage();
- }
- }
+ GcCtx.DeletedCas(DeletedChunks);
}
void
@@ -1079,96 +1725,187 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac
void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
{
- RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
-
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
+ uint64_t NewFileSize = Value.Value.Size();
TemporaryFile DataFile;
std::error_code Ec;
DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
-
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir));
- }
+ throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
+ }
+
+ bool CleanUpTempFile = false;
+ auto __ = MakeGuard([&] {
+ if (CleanUpTempFile)
+ {
+ std::error_code Ec;
+ std::filesystem::remove(DataFile.GetPath(), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
+ DataFile.GetPath(),
+ m_BucketDir,
+ Ec.message());
+ }
+ }
+ });
DataFile.WriteAll(Value.Value, Ec);
-
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to file", NiceBytes(Value.Value.Size())));
+ throw std::system_error(Ec,
+ fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'",
+ NiceBytes(NewFileSize),
+ DataFile.GetPath().string(),
+ m_BucketDir));
}
- // Move file into place (atomically)
-
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
std::filesystem::path FsPath{DataFilePath.ToPath()};
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
-
- if (Ec)
+ // We retry to move the file since it can be held open for read.
+ // This happens if the server processes a Get request for the file or
+ // if we are busy sending the file upstream
+ int RetryCount = 4;
+ do
{
- int RetryCount = 3;
-
- do
+ Ec.clear();
{
- std::filesystem::path ParentPath = FsPath.parent_path();
- CreateDirectories(ParentPath);
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (!Ec)
+ // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
+ // will be disabled as the file handle has already been closed
+ CleanUpTempFile = Ec ? true : false;
+
+ if (Ec)
{
- break;
+ std::error_code ExistingEc;
+ uint64_t OldFileSize = std::filesystem::file_size(FsPath, ExistingEc);
+ if (!ExistingEc && (OldFileSize == NewFileSize))
+ {
+ ZEN_INFO(
+ "Failed to move temporary file '{}' to '{}' for '{}'. Target file has same size, assuming concurrent write of same "
+ "value, "
+ "move "
+ "failed with reason '{}'",
+ DataFile.GetPath(),
+ FsPath.string(),
+ m_BucketDir,
+ Ec.message());
+ return;
+ }
}
+ }
- std::error_code InnerEc;
- const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc);
+ if (!Ec)
+ {
+ uint8_t EntryFlags = DiskLocation::kStandaloneFile;
- if (!InnerEc && ExistingFileSize == Value.Value.Size())
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- // Concurrent write of same value?
- return;
+ EntryFlags |= DiskLocation::kCompressed;
}
- // Semi arbitrary back-off
- zen::Sleep(1000 * RetryCount);
- } while (RetryCount--);
+ DiskLocation Loc(NewFileSize, EntryFlags);
+ IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to finalize file '{}'", DataFilePath.ToUtf8()));
+ uint64_t OldFileSize = 0;
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It == m_Index.end())
+ {
+ // Previously unknown object
+ m_Index.insert({HashKey, Entry});
+ }
+ else
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ OldFileSize = It.value().Location.Size();
+ It.value() = Entry;
+ }
+
+ m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+ if (OldFileSize <= NewFileSize)
+ {
+ m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed);
+ }
+ else
+ {
+ m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed);
+ }
+ return;
}
- }
- // Update index
+ std::filesystem::path ParentPath = FsPath.parent_path();
+ if (!std::filesystem::is_directory(ParentPath))
+ {
+ Ec.clear();
+ std::filesystem::create_directories(ParentPath, Ec);
+ if (!Ec)
+ {
+ // Retry without sleep
+ continue;
+ }
+ throw std::system_error(
+ Ec,
+ fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir));
+ }
- uint64_t EntryFlags = DiskLocation::kStandaloneFile;
+ ZEN_INFO("Failed renaming temporary file '{}' to '{}' for put in '{}', pausing and retrying, reason '{}'",
+ DataFile.GetPath().string(),
+ FsPath.string(),
+ m_BucketDir,
+ Ec.message());
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
+ // Semi arbitrary back-off
+ zen::Sleep(200 * (5 - RetryCount)); // Sleep at most for a total of 3 seconds
+ } while (RetryCount-- > 0);
- RwLock::ExclusiveLockScope _(m_IndexLock);
+ throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir));
+}
- DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
- IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
+void
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ uint8_t EntryFlags = 0;
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
{
- // Previously unknown object
- m_Index.insert({HashKey, Entry});
+ EntryFlags |= DiskLocation::kStructured;
}
- else
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- // TODO: should check if write is idempotent and bail out if it is?
- It.value() = Entry;
+ EntryFlags |= DiskLocation::kCompressed;
}
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
+ DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
+ const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
+ m_SlogFile.Append(DiskIndexEntry);
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
+ IndexEntry& Entry = It.value();
+ Entry.Location = Location;
+ Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ }
+ else
+ {
+ m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
+ }
+ });
+ m_TotalSize.fetch_add(Value.Value.Size(), std::memory_order::relaxed);
}
//////////////////////////////////////////////////////////////////////////
@@ -1255,10 +1992,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
auto It = m_Buckets.try_emplace(BucketName, BucketName);
Bucket = &It.first->second;
- std::filesystem::path bucketPath = m_RootDir;
- bucketPath /= BucketName;
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName;
- Bucket->OpenOrCreate(bucketPath);
+ Bucket->OpenOrCreate(BucketPath);
}
}
@@ -1273,49 +2010,23 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
void
ZenCacheDiskLayer::DiscoverBuckets()
{
- FileSystemTraversal Traversal;
- struct Visitor : public FileSystemTraversal::TreeVisitor
- {
- virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
- [[maybe_unused]] const path_view& File,
- [[maybe_unused]] uint64_t FileSize) override
- {
- }
-
- virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
- {
- Dirs.push_back((decltype(Dirs)::value_type)(DirectoryName));
- return false;
- }
-
- std::vector<std::filesystem::path::string_type> Dirs;
- } Visit;
-
- Traversal.TraverseFileSystem(m_RootDir, Visit);
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
// Initialize buckets
RwLock::ExclusiveLockScope _(m_Lock);
- for (const auto& BucketName : Visit.Dirs)
+ for (const std::filesystem::path& BucketPath : DirContent.Directories)
{
+ std::string BucketName = PathToUtf8(BucketPath.stem());
// New bucket needs to be created
-
-#if ZEN_PLATFORM_WINDOWS
- std::string BucketName8 = WideToUtf8(BucketName);
-#else
- const auto& BucketName8 = BucketName;
-#endif
-
- if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end())
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
{
}
else
{
- auto InsertResult = m_Buckets.try_emplace(BucketName8, BucketName8);
-
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName8;
+ auto InsertResult = m_Buckets.try_emplace(BucketName, BucketName);
CacheBucket& Bucket = InsertResult.first->second;
@@ -1323,11 +2034,11 @@ ZenCacheDiskLayer::DiscoverBuckets()
if (Bucket.IsOk())
{
- ZEN_INFO("Discovered bucket '{}'", BucketName8);
+ ZEN_INFO("Discovered bucket '{}'", BucketName);
}
else
{
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir);
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
m_Buckets.erase(InsertResult.first);
}
@@ -1363,11 +2074,10 @@ void
ZenCacheDiskLayer::Flush()
{
std::vector<CacheBucket*> Buckets;
- Buckets.reserve(m_Buckets.size());
{
RwLock::SharedLockScope _(m_Lock);
-
+ Buckets.reserve(m_Buckets.size());
for (auto& Kv : m_Buckets)
{
Buckets.push_back(&Kv.second);
@@ -1416,6 +2126,176 @@ ZenCacheDiskLayer::TotalSize() const
return TotalSize;
}
+//////////////////////////// ZenCacheStore
+
+static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
+
+ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc)
+{
+ CreateDirectories(BasePath);
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+
+ std::vector<std::string> LegacyBuckets;
+ std::vector<std::string> Namespaces;
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ std::string DirName = PathToUtf8(DirPath.stem());
+ if (DirName.starts_with(NamespaceDiskPrefix))
+ {
+ Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
+ continue;
+ }
+ LegacyBuckets.push_back(DirName);
+ }
+
+ ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size());
+
+ if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
+ {
+ // default (unspecified) and ue4-ddc namespace points to the same namespace instance
+
+ ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName);
+
+ std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
+ CreateDirectories(DefaultNamespaceFolder);
+
+ // Move any non-namespace folders into the default namespace folder
+ for (const std::string& DirName : LegacyBuckets)
+ {
+ std::filesystem::path LegacyFolder = BasePath / DirName;
+ std::filesystem::path NewPath = DefaultNamespaceFolder / DirName;
+ std::error_code Ec;
+ std::filesystem::rename(LegacyFolder, NewPath, Ec);
+ if (Ec)
+ {
+ ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message());
+ }
+ }
+ Namespaces.push_back(std::string(UE4DDCNamespaceName));
+ }
+
+ for (const std::string& NamespaceName : Namespaces)
+ {
+ m_Namespaces[NamespaceName] =
+ std::make_unique<ZenCacheNamespace>(Gc, BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName));
+ }
+}
+
+ZenCacheStore::~ZenCacheStore()
+{
+ m_Namespaces.clear();
+}
+
+bool
+ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->Get(Bucket, HashKey, OutValue);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
+ return false;
+}
+
+void
+ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->Put(Bucket, HashKey, Value);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
+}
+
+bool
+ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->DropBucket(Bucket);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}'", Namespace, Bucket);
+ return false;
+}
+
+void
+ZenCacheStore::Flush()
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
+}
+
+void
+ZenCacheStore::Scrub(ScrubContext& Ctx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); });
+}
+
+ZenCacheNamespace*
+ZenCacheStore::GetNamespace(std::string_view Namespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ if (Namespace == DefaultNamespace)
+ {
+ if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ }
+ return nullptr;
+}
+
+void
+ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const
+{
+ std::vector<std::pair<std::string, ZenCacheNamespace&> > Namespaces;
+ {
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ Namespaces.reserve(m_Namespaces.size());
+ for (const auto& Entry : m_Namespaces)
+ {
+ if (Entry.first == DefaultNamespace)
+ {
+ continue;
+ }
+ Namespaces.push_back({Entry.first, *Entry.second});
+ }
+ }
+ for (auto& Entry : Namespaces)
+ {
+ Callback(Entry.first, Entry.second);
+ }
+}
+
+void
+ZenCacheStore::GatherReferences(GcContext& GcCtx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.GatherReferences(GcCtx); });
+}
+
+void
+ZenCacheStore::CollectGarbage(GcContext& GcCtx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.CollectGarbage(GcCtx); });
+}
+
+GcStorageSize
+ZenCacheStore::StorageSize() const
+{
+ GcStorageSize Size;
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
+ GcStorageSize StoreSize = Store.StorageSize();
+ Size.MemorySize += StoreSize.MemorySize;
+ Size.DiskSize += StoreSize.DiskSize;
+ });
+ return Size;
+}
+
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
@@ -1427,10 +2307,18 @@ namespace testutils {
IoBuffer CreateBinaryCacheValue(uint64_t Size)
{
- std::vector<uint32_t> Data(size_t(Size / sizeof(uint32_t)));
- std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; });
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
- IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t));
+ IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size());
Buf.SetContentType(ZenContentType::kBinary);
return Buf;
};
@@ -1443,7 +2331,7 @@ TEST_CASE("z$.store")
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const int kIterationCount = 100;
@@ -1496,8 +2384,8 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
@@ -1516,8 +2404,8 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -1539,8 +2427,8 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
@@ -1559,8 +2447,8 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -1597,9 +2485,9 @@ TEST_CASE("z$.gc")
};
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
- const auto Bucket = "teardrinker"sv;
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ const auto Bucket = "teardrinker"sv;
// Create a cache record
const IoHash Key = CreateKey(42);
@@ -1635,7 +2523,7 @@ TEST_CASE("z$.gc")
// Expect timestamps to be serialized
{
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
std::vector<IoHash> Keep;
// Collect garbage with 1 hour max cache duration
@@ -1656,7 +2544,7 @@ TEST_CASE("z$.gc")
{
ScopedTemporaryDirectory TempDir;
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "fortysixandtwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -1704,7 +2592,7 @@ TEST_CASE("z$.gc")
{
ScopedTemporaryDirectory TempDir;
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "rightintwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -1737,6 +2625,7 @@ TEST_CASE("z$.gc")
GcCtx.MaxCacheDuration(std::chrono::minutes(2));
GcCtx.CollectSmallObjects(true);
+ Zcs.Flush();
Gc.CollectGarbage(GcCtx);
for (const auto& Key : Keys)
@@ -1751,6 +2640,502 @@ TEST_CASE("z$.gc")
}
}
+TEST_CASE("z$.legacyconversion")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[] = {2041,
+ 1123,
+ 1223,
+ 1239,
+ 341,
+ 1412,
+ 912,
+ 774,
+ 341,
+ 431,
+ 554,
+ 1098,
+ 2048,
+ 339 + 64 * 1024,
+ 561 + 64 * 1024,
+ 16 + 64 * 1024,
+ 16 + 64 * 1024,
+ 2048,
+ 2048};
+ size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
+ size_t SingleBlockSize = 0;
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(ChunkCount);
+ for (uint64_t Size : ChunkSizes)
+ {
+ Chunks.push_back(testutils::CreateBinaryCacheValue(Size));
+ SingleBlockSize += Size;
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunkCount);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ const std::string Bucket = "rightintwo";
+ {
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path());
+ const GcClock::TimePoint CurrentTime = GcClock::Now();
+
+ for (size_t i = 0; i < ChunkCount; i++)
+ {
+ Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]});
+ }
+
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcContext GcCtx(CurrentTime + std::chrono::hours(2));
+ GcCtx.MaxCacheDuration(std::chrono::minutes(2));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepChunks);
+ Zcs.Flush();
+ Gc.CollectGarbage(GcCtx);
+ }
+ std::filesystem::path BucketDir = TempDir.Path() / Bucket;
+ std::filesystem::path BlocksBaseDir = BucketDir / "blocks";
+
+ std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1);
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir);
+ std::filesystem::remove(LegacyDataPath);
+ std::filesystem::rename(CasPath, LegacyDataPath);
+
+ std::vector<DiskIndexEntry> LogEntries;
+ std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion &&
+ Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
+ {
+ LogEntries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ }
+ }
+ ObjectIndexFile.Close();
+ std::filesystem::remove(IndexPath);
+ }
+
+ std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket);
+ {
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ LogEntries.reserve(CasLog.GetLogCount());
+ CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
+ }
+ TCasLogFile<LegacyDiskIndexEntry> LegacyLog;
+ std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir);
+ LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
+
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ uint64_t Size;
+ uint64_t Offset;
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ Size = Entry.Location.Location.StandaloneSize;
+ Offset = 0;
+ }
+ else
+ {
+ BlockStoreLocation Location = Entry.Location.GetBlockLocation(16);
+ Size = Location.Size;
+ Offset = Location.Offset;
+ }
+ LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56);
+ LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation};
+ LegacyLog.Append(LegacyEntry);
+ }
+ LegacyLog.Close();
+
+ std::filesystem::remove_all(BlocksBaseDir);
+ std::filesystem::remove(LogPath);
+ std::filesystem::remove(IndexPath);
+
+ {
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path());
+
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ ZenCacheValue Value;
+ CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value));
+ CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value));
+ }
+ }
+}
+
+TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ struct Chunk
+ {
+ std::string Bucket;
+ IoBuffer Buffer;
+ };
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ const std::string Bucket1 = "rightinone";
+ const std::string Bucket2 = "rightintwo";
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ while (true)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ break;
+ }
+ while (true)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ break;
+ }
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ WorkerThreadPool ThreadPool(4);
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path());
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+
+ const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * Chunks.size(), TotalSize);
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
+
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+ std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ {
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ }
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ }
+ }
+
+ std::atomic<size_t> WorkCompleted = 0;
+ std::atomic_uint32_t AddedChunkCount = 0;
+ for (const auto& Chunk : NewChunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (AddedChunkCount.load() < NewChunks.size())
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < GcChunkHashes.size())
+ {
+ Sleep(1);
+ }
+ }
+ }
+}
+
+TEST_CASE("z$.namespaces")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
+
+ // Create a cache record
+ const IoHash Key = CreateKey(42);
+ CbObject CacheValue = CreateCacheValue(4096);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key, PutValue);
+
+ ZenCacheValue GetValue;
+ CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key, GetValue));
+
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue));
+
+ // This should just be dropped for now until we decide how we add namespaces
+ Zcs.Put(CustomNamespace, Bucket, Key, PutValue);
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue));
+
+ const IoHash Key2 = CreateKey(43);
+ CbObject CacheValue2 = CreateCacheValue(4096);
+
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+ ZenCacheValue PutValue2 = {.Value = Buffer2};
+ Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2);
+
+ CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
+ }
+}
+
+TEST_CASE("z$.blocked.disklayer.put")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+
+ CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ size_t Key = Buffer.Size();
+ IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer});
+
+ ZenCacheValue BufferGet;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
+
+ // Overwriting with a value of same size should go fine
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer});
+
+ CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1);
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+# if ZEN_PLATFORM_WINDOWS
+ // On Windows platform, overwriting with different size while we have
+ // it open for read should throw exception if file is held open
+ CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}));
+# else
+ // Other platforms should handle overwrite just fine
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2});
+# endif
+
+ BufferGet = ZenCacheValue{};
+
+ // Read access has been removed, we should now be able to overwrite it
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2});
+}
+
#endif
void