aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-06 15:46:29 +0200
committerDan Engelbrecht <[email protected]>2022-04-12 22:20:47 +0200
commit31dd0f8906aa5a27b8c453c72f6d10964a3be9eb (patch)
tree9f4a7eb3bd9d6614bbf38c89f158c52ab7975b97 /zenserver/cache/structuredcachestore.cpp
parentMerge pull request #72 from EpicGames/de/set-ulimit (diff)
downloadzen-31dd0f8906aa5a27b8c453c72f6d10964a3be9eb.tar.xz
zen-31dd0f8906aa5a27b8c453c72f6d10964a3be9eb.zip
structured cache with block store
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp1977
1 files changed, 1758 insertions, 219 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 738e4c1fd..c5ccef523 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -14,13 +14,13 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/cidstore.h>
+#include <xxhash.h>
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
#endif
@@ -30,10 +30,220 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <random>
+#endif
+
//////////////////////////////////////////////////////////////////////////
+#pragma pack(push)
+#pragma pack(1)
+
namespace zen {
+namespace {
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct CacheBucketIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+ static_assert(sizeof(CacheBucketIndexHeader) == 32);
+
+ struct LegacyDiskLocation
+ {
+ inline LegacyDiskLocation() = default;
+
+ inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
+
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
+ static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
+
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+
+ inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
+ inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
+ inline ZenContentType GetContentType() const
+ {
+ ZenContentType ContentType = ZenContentType::kBinary;
+
+ if (IsFlagSet(LegacyDiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
+ }
+
+ if (IsFlagSet(LegacyDiskLocation::kCompressed))
+ {
+ ContentType = ZenContentType::kCompressedBinary;
+ }
+
+ return ContentType;
+ }
+ inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; }
+
+ private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
+ };
+
+ struct LegacyDiskIndexEntry
+ {
+ IoHash Key;
+ LegacyDiskLocation Location;
+ };
+
+#pragma pack(pop)
+
+ static_assert(sizeof(LegacyDiskIndexEntry) == 36);
+
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
+ const char* DataExtension = ".sobs";
+
+ std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
+ {
+ ExtendablePathBuilder<256> Path;
+
+ char BlockHexString[9];
+ ToHexNumber(BlockIndex, BlockHexString);
+
+ Path.Append(BlocksBasePath);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
+ Path.AppendSeparator();
+ Path.Append(BlockHexString);
+ Path.Append(DataExtension);
+ return Path.ToPath();
+ }
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + IndexExtension);
+ }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + ".tmp" + IndexExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir)
+ {
+ return BucketDir / (std::string("zen") + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir)
+ {
+ return BucketDir / (std::string("zen") + DataExtension);
+ }
+
+ std::vector<DiskIndexEntry> MakeDiskIndexEntries(const std::unordered_map<IoHash, DiskLocation>& MovedChunks,
+ const std::vector<IoHash>& DeletedChunks)
+ {
+ std::vector<DiskIndexEntry> result;
+ result.reserve(MovedChunks.size());
+ for (const auto& MovedEntry : MovedChunks)
+ {
+ result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second});
+ }
+ for (const IoHash& ChunkHash : DeletedChunks)
+ {
+ DiskLocation Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ result.push_back({.Key = ChunkHash, .Location = Location});
+ }
+ return result;
+ }
+
+ bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured |
+ LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+ bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.GetFlags() &
+ ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+} // namespace
+
namespace fs = std::filesystem;
static CbObject
@@ -60,9 +270,9 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
}
ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir)
-: GcStorage(Gc)
+: m_RootDir(RootDir)
+, GcStorage(Gc)
, GcContributor(Gc)
-, m_RootDir(RootDir)
, m_DiskLayer(RootDir)
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
@@ -425,6 +635,8 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
+ m_BlocksBasePath = BucketDir / "blocks";
+
CreateDirectories(BucketDir);
std::filesystem::path ManifestPath{BucketDir / "zen_manifest"};
@@ -470,48 +682,694 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
- m_BucketDir = BucketDir;
+ ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &EntryCount, &Timer] {
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
+ m_BucketDir / m_BucketName,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- uint64_t MaxFileOffset = 0;
- uint64_t InvalidEntryCount = 0;
- m_SobsCursor = 0;
- m_TotalSize = 0;
+ namespace fs = std::filesystem;
- m_Index.clear();
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
- std::filesystem::path SobsPath{BucketDir / "zen.sobs"};
- std::filesystem::path SlogPath{BucketDir / "zen.slog"};
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, STmpIndexPath);
+ }
+
+ try
+ {
+ m_SlogFile.Flush();
- m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
- m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ // Write the current state of the location map to a new index state
+ uint64_t LogCount = 0;
+ std::vector<DiskIndexEntry> Entries;
- m_SlogFile.Replay(
- [&](const DiskIndexEntry& Entry) {
- if (Entry.Key == IoHash::Zero)
+ {
+ RwLock::SharedLockScope __(m_InsertLock);
+ RwLock::SharedLockScope ___(m_IndexLock);
+ Entries.resize(m_Index.size());
+
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_Index)
{
- ++InvalidEntryCount;
+ DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = Entry.second.Location;
}
- else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+
+ LogCount = m_SlogFile.GetLogCount();
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(STmpIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadIndexFile()
+{
+ std::vector<DiskIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &Entries, &Timer] {
+ ZEN_INFO("read store '{}' index containing #{} entries in {}",
+ m_BucketDir / m_BucketName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
{
- m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::string InvalidEntryReason;
+ for (const DiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+
+ return Header.LogPosition;
}
else
{
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
}
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
- },
- 0);
+ }
+ }
+ return 0;
+}
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
+{
+ std::vector<DiskIndexEntry> Entries;
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([LogPath, &Entries, &Timer] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ uint64_t ReadCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(ReadCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount()));
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
+ }
+ }
+ }
+ return 0;
+};
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
+{
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
+
+ if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
+ {
+ return 0;
+ }
+
+ ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName);
+
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
+
+ uint64_t MigratedChunkCount = 0;
+ uint32_t MigratedBlockCount = 0;
+ Stopwatch MigrationTimer;
+ uint64_t TotalSize = 0;
+ const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] {
+ ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
+ m_BucketDir / m_BucketName,
+ MigratedChunkCount,
+ MigratedBlockCount,
+ NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
+ NiceBytes(TotalSize));
+ });
+
+ uint32_t WriteBlockIndex = 0;
+ while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
+ return 0;
+ }
+
+ if (Space.Free < MaxBlockSize)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
+ m_BucketDir / m_BucketName,
+ MaxBlockSize,
+ NiceBytes(Space.Free));
+ return 0;
+ }
+
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+
+ std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
+ uint64_t InvalidEntryCount = 0;
+
+ TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog;
+ LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
+ {
+ Stopwatch Timer;
+ const auto __ = MakeGuard([LegacyLogPath, &LegacyDiskIndex, &Timer] {
+ ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
+ LegacyLogPath,
+ LegacyDiskIndex.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ if (LegacyCasLog.Initialize())
+ {
+ LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
+ LegacyCasLog.Replay(
+ [&](const LegacyDiskIndexEntry& Record) {
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ std::string InvalidEntryReason;
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ if (m_Index.contains(Record.Key))
+ {
+ return;
+ }
+ LegacyDiskIndex[Record.Key] = Record;
+ },
+ 0);
+
+ std::vector<IoHash> BadEntries;
+ uint64_t BlockFileSize = BlockFile.FileSize();
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyDiskIndexEntry& Record(Entry.second);
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize)
+ {
+ continue;
+ }
+ ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
+ BadEntries.push_back(Entry.first);
+ }
+ for (const IoHash& BadHash : BadEntries)
+ {
+ LegacyDiskIndex.erase(BadHash);
+ }
+ InvalidEntryCount += BadEntries.size();
+ }
+ }
if (InvalidEntryCount)
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath);
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
}
- m_SobsCursor = (MaxFileOffset + 15) & ~15;
+ if (LegacyDiskIndex.empty())
+ {
+ LegacyCasLog.Close();
+ BlockFile.Close();
+ if (CleanSource)
+ {
+ // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
+ // a manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ }
+ return 0;
+ }
+
+ uint64_t BlockChunkCount = 0;
+ uint64_t BlockTotalSize = 0;
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyDiskIndexEntry& Record(Entry.second);
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ BlockChunkCount++;
+ BlockTotalSize += Record.Location.Size();
+ }
+
+ uint64_t RequiredDiskSpace = BlockTotalSize + ((m_PayloadAlignment - 1) * BlockChunkCount);
+ uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize;
+ if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
+ m_BucketDir / m_BucketName,
+ MaxRequiredBlockCount,
+ BlockStoreDiskLocation::MaxBlockIndex);
+ return 0;
+ }
+
+ constexpr const uint64_t DiskReserve = 1ul << 28;
+
+ if (CleanSource)
+ {
+ if (Space.Free < (MaxBlockSize + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ m_BucketDir / m_BucketName,
+ NiceBytes(MaxBlockSize + DiskReserve),
+ NiceBytes(Space.Free));
+ return 0;
+ }
+ }
+ else
+ {
+ if (Space.Free < (RequiredDiskSpace + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ m_BucketDir / m_BucketName,
+ NiceBytes(RequiredDiskSpace + DiskReserve),
+ NiceBytes(Space.Free));
+ return 0;
+ }
+ }
+
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ CreateDirectories(LogPath.parent_path());
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ if (CleanSource && (MaxRequiredBlockCount < 2))
+ {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(LegacyDiskIndex.size());
+
+ // We can use the block as is, just move it and add the blocks to our new log
+ for (auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyDiskIndexEntry& Record(Entry.second);
+
+ DiskLocation NewLocation;
+ uint8_t Flags = 0xff & (Record.Location.Flags() >> 56);
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ NewLocation = DiskLocation(Record.Location.Size(), Flags);
+ }
+ else
+ {
+ BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.Offset(), Record.Location.Size());
+ NewLocation = DiskLocation(NewChunkLocation, m_PayloadAlignment, Flags);
+ }
+ LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ CreateDirectories(BlockPath.parent_path());
+ BlockFile.Close();
+ std::filesystem::rename(LegacyDataPath, BlockPath);
+ CasLog.Append(LogEntries);
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+
+ MigratedChunkCount += LogEntries.size();
+ MigratedBlockCount++;
+ }
+ else
+ {
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(LegacyDiskIndex.size());
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ ChunkHashes.push_back(Entry.first);
+ }
+
+ std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
+ auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
+ auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
+ return LhsKeyIt->second.Location.Offset() < RhsKeyIt->second.Location.Offset();
+ });
+
+ uint64_t BlockSize = 0;
+ uint64_t BlockOffset = 0;
+ std::vector<BlockStoreLocation> NewLocations;
+ struct BlockData
+ {
+ std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
+ uint64_t BlockOffset;
+ uint64_t BlockSize;
+ uint32_t BlockIndex;
+ };
+
+ std::vector<BlockData> BlockRanges;
+ std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
+ BlockRanges.reserve(MaxRequiredBlockCount);
+ for (const IoHash& ChunkHash : ChunkHashes)
+ {
+ const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
+
+ if (LegacyChunkLocation.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ // For standalone files we just store the chunk hash an use the size from the legacy index as is
+ Chunks.push_back({ChunkHash, {}});
+ continue;
+ }
+
+ uint64_t ChunkOffset = LegacyChunkLocation.Offset();
+ uint64_t ChunkSize = LegacyChunkLocation.Size();
+ uint64_t ChunkEnd = ChunkOffset + ChunkSize;
+
+ if (BlockSize == 0)
+ {
+ BlockOffset = ChunkOffset;
+ }
+ if ((ChunkEnd - BlockOffset) > MaxBlockSize)
+ {
+ BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
+ BlockRange.Chunks.swap(Chunks);
+ BlockRanges.push_back(BlockRange);
+
+ WriteBlockIndex++;
+ while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+ BlockOffset = ChunkOffset;
+ BlockSize = 0;
+ }
+ BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
+ BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
+ Chunks.push_back({ChunkHash, ChunkLocation});
+ BlockSize = ChunkEnd - BlockOffset;
+ }
+ if (BlockSize > 0)
+ {
+ BlockRanges.push_back(
+ {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
+ }
+ Stopwatch WriteBlockTimer;
+
+ std::reverse(BlockRanges.begin(), BlockRanges.end());
+ std::vector<std::uint8_t> Buffer(1 << 28);
+ for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
+ {
+ const BlockData& BlockRange = BlockRanges[Idx];
+ if (Idx > 0)
+ {
+ uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
+ uint64_t Completed = BlockOffset + BlockSize - Remaining;
+ uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
+
+ ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
+ m_BucketDir / m_BucketDir,
+ Idx,
+ BlockRanges.size(),
+ NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
+ NiceBytes(BlockOffset + BlockSize),
+ NiceTimeSpanMs(ETA));
+ }
+
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex);
+ BlockStoreFile ChunkBlock(BlockPath);
+ ChunkBlock.Create(BlockRange.BlockSize);
+ uint64_t Offset = 0;
+ while (Offset < BlockRange.BlockSize)
+ {
+ uint64_t Size = BlockRange.BlockSize - Offset;
+ if (Size > Buffer.size())
+ {
+ Size = Buffer.size();
+ }
+ BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
+ ChunkBlock.Write(Buffer.data(), Size, Offset);
+ Offset += Size;
+ }
+ ChunkBlock.Truncate(Offset);
+ ChunkBlock.Flush();
+
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(BlockRange.Chunks.size());
+ for (const auto& Entry : BlockRange.Chunks)
+ {
+ const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
+
+ DiskLocation NewLocation;
+ uint8_t Flags = 0xff & (LegacyEntry.Location.Flags() >> 56);
+ if (LegacyEntry.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ NewLocation = DiskLocation(LegacyEntry.Location.Size(), Flags);
+ }
+ else
+ {
+ NewLocation = DiskLocation(Entry.second, m_PayloadAlignment, Flags);
+ }
+ LogEntries.push_back({.Key = Entry.first, .Location = NewLocation});
+ }
+ CasLog.Append(LogEntries);
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+ MigratedChunkCount += LogEntries.size();
+ MigratedBlockCount++;
+
+ if (CleanSource)
+ {
+ std::vector<LegacyDiskIndexEntry> LegacyLogEntries;
+ LegacyLogEntries.reserve(BlockRange.Chunks.size());
+ for (const auto& Entry : BlockRange.Chunks)
+ {
+ LegacyLogEntries.push_back(
+ {.Key = Entry.first, .Location = LegacyDiskLocation(0, 0, 0, LegacyDiskLocation::kTombStone)});
+ }
+ LegacyCasLog.Append(LegacyLogEntries);
+ BlockFile.SetFileSize(BlockRange.BlockOffset);
+ }
+ }
+ }
+ BlockFile.Close();
+ LegacyCasLog.Close();
+ CasLog.Close();
+
+ if (CleanSource)
+ {
+ // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
+ // a manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ }
+ return MigratedChunkCount;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+{
+ m_BucketDir = BucketDir;
+
+ m_TotalSize = 0;
+
+ m_Index.clear();
+
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+
+ if (IsNew)
+ {
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
+ fs::remove(LegacyLogPath);
+ fs::remove(LegacyDataPath);
+ fs::remove(LogPath);
+ fs::remove(IndexPath);
+ fs::remove_all(m_BlocksBasePath);
+ }
+
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
+ uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+
+ CreateDirectories(m_BucketDir);
+
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : m_Index)
+ {
+ const DiskLocation& Location = Entry.second.Location;
+ m_TotalSize.fetch_add(Location.Size(), std::memory_order_release);
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ KnownBlocks.insert(Location.GetBlockLocation(m_PayloadAlignment).BlockIndex);
+ }
+
+ if (std::filesystem::is_directory(m_BlocksBasePath))
+ {
+ std::vector<std::filesystem::path> FoldersToScan;
+ FoldersToScan.push_back(m_BlocksBasePath);
+ size_t FolderOffset = 0;
+ while (FolderOffset < FoldersToScan.size())
+ {
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
+ {
+ if (Entry.is_directory())
+ {
+ FoldersToScan.push_back(Entry.path());
+ continue;
+ }
+ if (Entry.is_regular_file())
+ {
+ const std::filesystem::path Path = Entry.path();
+ if (Path.extension() != DataExtension)
+ {
+ continue;
+ }
+ std::string FileName = Path.stem().string();
+ uint32_t BlockIndex;
+ bool OK = ParseHexNumber(FileName, BlockIndex);
+ if (!OK)
+ {
+ continue;
+ }
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ // Log removing unreferenced block
+ // Clear out unused blocks
+ ZEN_INFO("removing unused block for '{}' at '{}'", m_BucketDir / m_BucketName, Path);
+ std::error_code Ec;
+ std::filesystem::remove(Path, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
+ }
+ continue;
+ }
+ Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
+ BlockFile->Open();
+ m_ChunkBlocks[BlockIndex] = BlockFile;
+ }
+ }
+ ++FolderOffset;
+ }
+ }
+ else
+ {
+ CreateDirectories(m_BlocksBasePath);
+ }
+
+ if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ {
+ MakeIndexSnapshot();
+ }
+ // TODO: should validate integrity of container files here
}
void
@@ -537,7 +1395,10 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen
return false;
}
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size());
+ const BlockStoreLocation& Location = Loc.GetBlockLocation(m_PayloadAlignment);
+ Ref<BlockStoreFile> ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
+
+ OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size);
OutValue.Value.SetContentType(Loc.GetContentType());
return true;
@@ -562,23 +1423,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return false;
}
-void
-ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc,
- const IoHash& HashKey,
- const fs::path& Path,
- std::error_code& Ec)
-{
- ZEN_DEBUG("deleting standalone cache file '{}'", Path);
- fs::remove(Path, Ec);
-
- if (!Ec)
- {
- m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}});
- m_Index.erase(HashKey);
- m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
- }
-}
-
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -619,54 +1463,91 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
{
return PutStandaloneCacheValue(HashKey, Value);
}
- else
- {
- // Small object put
- uint64_t EntryFlags = 0;
+ // Small object put
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
+ uint8_t EntryFlags = 0;
- RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
+
+ uint64_t ChunkSize = Value.Value.Size();
- DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags);
+ uint32_t WriteBlockIndex;
+ Ref<BlockStoreFile> WriteBlock;
+ uint64_t InsertOffset;
- m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16);
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
+ WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ bool IsWriting = m_WriteBlock != nullptr;
+ if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > MaxBlockSize)
{
- // Previously unknown object
- m_Index.insert({HashKey, {Loc, GcClock::TickCount()}});
+ if (m_WriteBlock)
+ {
+ m_WriteBlock = nullptr;
+ }
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BucketDir / m_BucketName));
+ }
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ m_WriteBlock = new BlockStoreFile(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ }
+ m_CurrentInsertOffset = 0;
+ m_WriteBlock->Create(MaxBlockSize);
}
- else
+ InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment);
+ WriteBlock = m_WriteBlock;
+ }
+
+ DiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment, EntryFlags);
+ const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
+
+ WriteBlock->Write(Value.Value.Data(), ChunkSize, InsertOffset);
+ m_SlogFile.Append(DiskIndexEntry);
+
+ m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed);
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
// TODO: should check if write is idempotent and bail out if it is?
// this would requiring comparing contents on disk unless we add a
// content hash to the index entry
IndexEntry& Entry = It.value();
- Entry.Location = Loc;
+ Entry.Location = Location;
Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
}
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset());
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ else
+ {
+ m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
+ }
}
}
void
ZenCacheDiskLayer::CacheBucket::Drop()
{
+ // TODO: close all open files and manage locking
// TODO: add error handling
-
- m_SobsFile.Close();
m_SlogFile.Close();
DeleteDirectories(m_BucketDir);
}
@@ -674,10 +1555,20 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
+ {
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+ }
RwLock::SharedLockScope _(m_IndexLock);
- m_SobsFile.Flush();
- m_SlogFile.Flush();
+ MakeIndexSnapshot();
SaveManifest();
}
@@ -754,9 +1645,10 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
// Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- const DiskLocation& Location = It->second.Location;
- m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}});
+ const auto It = m_Index.find(BadKey);
+ DiskLocation Location = It->second.Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location});
m_Index.erase(BadKey);
}
}
@@ -768,8 +1660,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences");
Stopwatch Timer;
- const auto Guard = MakeGuard(
- [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ const auto Guard = MakeGuard([this, &Timer] {
+ ZEN_INFO("gathered references from '{}' in {}", m_BucketDir / m_BucketName, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
const GcClock::TimePoint ExpireTime =
GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration();
@@ -820,6 +1713,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
}
}
+ _.ReleaseNow();
ValidKeys.reserve(std::distance(ValidIt, Entries.end()));
ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt));
@@ -836,202 +1730,480 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
- Flush();
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
+ std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
+
+ Stopwatch TotalTimer;
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ uint64_t DeletedCount = 0;
+ uint64_t MovedCount = 0;
+
+ const auto _ = MakeGuard([this,
+ &TotalTimer,
+ &WriteBlockTimeUs,
+ &WriteBlockLongestTimeUs,
+ &ReadBlockTimeUs,
+ &ReadBlockLongestTimeUs,
+ &TotalChunkCount,
+ &DeletedCount,
+ &MovedCount,
+ &DeletedSize,
+ &OldTotalSize] {
+ ZEN_INFO(
+ "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "entires ({}).",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ RwLock::SharedLockScope _(m_IndexLock);
+ SaveManifest();
+ });
- const uint64_t OldCount = m_Index.size();
- const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+ m_SlogFile.Flush();
- ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir);
+ IndexMap Index;
+ size_t BlockCount;
+ uint64_t ExcludeBlockIndex = 0x800000000ull;
- Stopwatch Timer;
- const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] {
- const uint64_t NewCount = m_Index.size();
- const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed);
- ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})",
- m_BucketDir,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- OldCount - NewCount,
- NiceBytes(OldTotalSize - NewTotalSize),
- OldCount,
- NiceBytes(OldTotalSize));
- SaveManifest();
+ std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName);
+ std::vector<IoHash> DeleteCacheKeys;
+ DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
+ GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
+ if (Keep)
+ {
+ return;
+ }
+ DeleteCacheKeys.push_back(ChunkHash);
});
-
- if (m_Index.empty())
+ if (DeleteCacheKeys.empty())
{
+ ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName);
return;
}
-
- auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) {
- for (const IoHash& Key : Keys)
+ {
+ RwLock::SharedLockScope __(m_InsertLock);
+ RwLock::SharedLockScope ___(m_IndexLock);
{
- if (auto It = m_Index.find(Key); It != m_Index.end())
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.empty())
+ {
+ ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
+ return;
+ }
+ if (m_WriteBlock)
{
- OutEntries.push_back(*It);
+ ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
}
+ __.ReleaseNow();
}
- };
-
- std::vector<IndexMap::value_type> ValidEntries;
- std::vector<IndexMap::value_type> ExpiredEntries;
+ SaveManifest();
+ Index = m_Index;
+ BlockCount = m_ChunkBlocks.size();
- AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries);
- AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries);
+ for (const IoHash& Key : DeleteCacheKeys)
+ {
+ if (auto It = Index.find(Key); It != Index.end())
+ {
+ DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location};
+ if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ Entry.Location.Flags |= DiskLocation::kTombStone;
+ ExpiredStandaloneEntries.push_back(Entry);
+ }
+ }
+ }
+ if (GcCtx.IsDeletionMode())
+ {
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(ExpiredStandaloneEntries);
+ }
+ }
- // Remove all standalone file(s)
- // NOTE: This can probably be made asynchronously
+ if (GcCtx.IsDeletionMode())
{
std::error_code Ec;
ExtendablePathBuilder<256> Path;
- for (const auto& Entry : ExpiredEntries)
+ for (const auto& Entry : ExpiredStandaloneEntries)
{
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
+ const IoHash& Key = Entry.Key;
+ const DiskLocation& Loc = Entry.Location;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- Path.Reset();
- BuildPath(Path, Key);
+ Path.Reset();
+ BuildPath(Path, Key);
- // NOTE: this will update index and log file
- DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec);
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ if (m_Index.contains(Key))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
+ continue;
+ }
+ ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ fs::remove(Path.c_str(), Ec);
+ }
- if (Ec)
+ if (Ec)
+ {
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
+ Ec.clear();
+ DiskLocation RestoreLocation = Loc;
+ RestoreLocation.Flags &= ~DiskLocation::kTombStone;
+
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
{
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message());
- Ec.clear();
+ continue;
}
+ m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
+ m_Index.insert({Key, {Loc, GcClock::TickCount()}});
+ m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ continue;
}
+ m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ DeletedSize += Entry.Location.Size();
+ DeletedCount++;
+ }
+ }
+
+ TotalChunkCount = Index.size();
+
+ std::vector<IoHash> TotalChunkHashes;
+ TotalChunkHashes.reserve(TotalChunkCount);
+ for (const auto& Entry : Index)
+ {
+ const DiskLocation& Location = Entry.second.Location;
+
+ if (Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ continue;
}
+ TotalChunkHashes.push_back(Entry.first);
}
- if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty())
+ if (TotalChunkHashes.empty())
{
- // Naive GC implementation of small objects. Needs enough free
- // disk space to store intermediate sob container along side the
- // old container
+ return;
+ }
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<std::vector<IoHash>> KeepChunks;
+ std::vector<std::vector<IoHash>> DeleteChunks;
- const auto ResetSobStorage = [this, &ValidEntries]() {
- m_SobsFile.Close();
- m_SlogFile.Close();
+ BlockIndexToChunkMapIndex.reserve(BlockCount);
+ KeepChunks.reserve(BlockCount);
+ DeleteChunks.reserve(BlockCount);
+ size_t GuesstimateCountPerBlock = TotalChunkHashes.size() / BlockCount / 2;
- const bool IsNew = true;
- m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
- m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ uint64_t DeleteCount = 0;
- m_SobsCursor = 0;
- m_TotalSize = 0;
- m_Index.clear();
+ uint64_t NewTotalSize = 0;
- for (const auto& Entry : ValidEntries)
- {
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
+ std::unordered_set<IoHash, IoHash::Hasher> Expired;
+ Expired.insert(DeleteCacheKeys.begin(), DeleteCacheKeys.end());
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- m_SlogFile.Append({.Key = Key, .Location = Loc});
- m_Index.insert({Key, {Loc, GcClock::TickCount()}});
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
- }
- }
- };
+ GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ auto KeyIt = Index.find(ChunkHash);
+ const DiskLocation& Location = KeyIt->second.Location;
+ BlockStoreLocation BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
- uint64_t NewContainerSize{};
- for (const auto& Entry : ValidEntries)
+ uint32_t BlockIndex = BlockLocation.BlockIndex;
+
+ if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex)
{
- const DiskLocation& Loc = Entry.second.Location;
+ return;
+ }
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false)
- {
- NewContainerSize += (Loc.Size() + sizeof(DiskLocation));
- }
+ auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex);
+ size_t ChunkMapIndex = 0;
+ if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
+ {
+ ChunkMapIndex = KeepChunks.size();
+ BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex;
+ KeepChunks.resize(ChunkMapIndex + 1);
+ KeepChunks.back().reserve(GuesstimateCountPerBlock);
+ DeleteChunks.resize(ChunkMapIndex + 1);
+ DeleteChunks.back().reserve(GuesstimateCountPerBlock);
}
+ else
+ {
+ ChunkMapIndex = BlockIndexPtr->second;
+ }
+ if (Keep)
+ {
+ std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex];
+ ChunkMap.push_back(ChunkHash);
+ NewTotalSize += BlockLocation.Size;
+ }
+ else
+ {
+ std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ ChunkMap.push_back(ChunkHash);
+ DeleteCount++;
+ }
+ });
- if (NewContainerSize == 0)
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
+ for (const auto& Entry : BlockIndexToChunkMapIndex)
+ {
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ if (ChunkMap.empty())
{
- ResetSobStorage();
- return;
+ continue;
}
+ BlocksToReWrite.insert(BlockIndex);
+ }
- const uint64_t DiskSpaceMargin = (256 << 10);
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_BucketDir / m_BucketName,
+ DeleteCount,
+ NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
+ return;
+ }
- std::error_code Ec;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec);
- if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin)
+ auto AddToDeleted = [this, &Index, &DeletedCount, &DeletedSize](const std::vector<IoHash>& DeletedEntries) {
+ for (const IoHash& ChunkHash : DeletedEntries)
{
- ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)",
- m_BucketDir,
- NiceBytes(NewContainerSize),
- NiceBytes(Space.Free));
- return;
+ const DiskLocation& Location = Index[ChunkHash].Location;
+ ZEN_ASSERT(!Location.IsFlagSet(DiskLocation::kStandaloneFile));
+ DeletedSize += Index[ChunkHash].Location.GetBlockLocation(m_PayloadAlignment).Size;
}
+ DeletedCount += DeletedEntries.size();
+ };
- std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"};
- std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"};
+ // Move all chunks in blocks that have chunks removed to new blocks
- // Copy non expired sob(s) to temporary sob container
+ Ref<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
+ auto UpdateLocations = [this](const std::span<DiskIndexEntry>& Entries) {
+ for (const DiskIndexEntry& Entry : Entries)
{
- BasicFile TmpSobs;
- TCasLogFile<DiskIndexEntry> TmpLog;
- uint64_t TmpCursor{};
- std::vector<uint8_t> Chunk;
+ if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ auto KeyIt = m_Index.find(Entry.Key);
+ uint64_t ChunkSize = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment).Size;
+ m_TotalSize.fetch_sub(ChunkSize);
+ m_Index.erase(KeyIt);
+ continue;
+ }
+ m_Index[Entry.Key].Location = Entry.Location;
+ }
+ };
- TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate);
- TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate);
+ std::unordered_map<IoHash, DiskLocation> MovedBlockChunks;
+ for (uint32_t BlockIndex : BlocksToReWrite)
+ {
+ const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
- for (const auto& Entry : ValidEntries)
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _i(m_IndexLock);
+ OldBlockFile = m_ChunkBlocks[BlockIndex];
+ }
+
+ const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex];
+ if (KeepMap.empty())
+ {
+ const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries({}, DeleteMap);
+ m_SlogFile.Append(LogEntries);
+ m_SlogFile.Flush();
{
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
+ RwLock::ExclusiveLockScope _i(m_IndexLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ AddToDeleted(DeleteMap);
+ ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'",
+ m_BucketDir / m_BucketName,
+ BlockIndex,
+ OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion, reason: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ continue;
+ }
- DiskLocation NewLoc;
+ std::vector<uint8_t> Chunk;
+ for (const IoHash& ChunkHash : KeepMap)
+ {
+ auto KeyIt = Index.find(ChunkHash);
+ const BlockStoreLocation ChunkLocation = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment);
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > MaxBlockSize))
+ {
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed);
+ std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries(MovedBlockChunks, {});
+ m_SlogFile.Append(LogEntries);
+ m_SlogFile.Flush();
+
+ if (NewBlockFile)
{
- NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags());
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
}
- else
{
- Chunk.resize(Loc.Size());
- m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset());
-
- NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags());
- TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor);
- TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16);
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BucketDir / m_BucketName,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+ while (m_ChunkBlocks.contains(NextBlockIndex))
+ {
+ NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ }
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
}
- TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc});
+ MovedCount += MovedBlockChunks.size();
+ MovedBlockChunks.clear();
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
+ return;
+ }
+ if (Space.Free < MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve();
+ if (Space.Free + ReclaimedSpace < MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect from '{}' FAILED, required disk space {}, free {}",
+ m_BucketDir / m_BucketName,
+ MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ RwLock::ExclusiveLockScope _l(m_IndexLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks.erase(NextBlockIndex);
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BucketDir / m_BucketName,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
}
- }
- // Swap state
- try
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedBlockChunks.emplace(ChunkHash,
+ DiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()},
+ m_PayloadAlignment,
+ KeyIt->second.Location.Flags));
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
+ }
+ Chunk.clear();
+ if (NewBlockFile)
{
- fs::path SobsPath{m_BucketDir / "zen.sobs"};
- fs::path SlogPath{m_BucketDir / "zen.slog"};
-
- m_SobsFile.Close();
- m_SlogFile.Close();
-
- fs::remove(SobsPath);
- fs::remove(SlogPath);
-
- fs::rename(TmpSobsPath, SobsPath);
- fs::rename(TmpSlogPath, SlogPath);
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = {};
+ }
- const bool IsNew = false;
- OpenLog(m_BucketDir, IsNew);
+ const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries(MovedBlockChunks, DeleteMap);
+ m_SlogFile.Append(LogEntries);
+ m_SlogFile.Flush();
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ m_ChunkBlocks[BlockIndex] = nullptr;
}
- catch (std::exception& Err)
+ MovedCount += MovedBlockChunks.size();
+ AddToDeleted(DeleteMap);
+ MovedBlockChunks.clear();
+
+ ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'",
+ m_BucketDir / m_BucketName,
+ BlockIndex,
+ OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
{
- ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what());
- ResetSobStorage();
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
}
+ OldBlockFile = nullptr;
}
}
@@ -1144,16 +2316,20 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
// Update index
- uint64_t EntryFlags = DiskLocation::kStandaloneFile;
+ uint8_t EntryFlags = DiskLocation::kStandaloneFile;
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
{
EntryFlags |= DiskLocation::kStructured;
}
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
+ DiskLocation Loc(Value.Value.Size(), EntryFlags);
IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
if (auto It = m_Index.find(HashKey); It == m_Index.end())
@@ -1255,10 +2431,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
auto It = m_Buckets.try_emplace(BucketName, BucketName);
Bucket = &It.first->second;
- std::filesystem::path bucketPath = m_RootDir;
- bucketPath /= BucketName;
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName;
- Bucket->OpenOrCreate(bucketPath);
+ Bucket->OpenOrCreate(BucketPath);
}
}
@@ -1363,11 +2539,11 @@ void
ZenCacheDiskLayer::Flush()
{
std::vector<CacheBucket*> Buckets;
- Buckets.reserve(m_Buckets.size());
{
RwLock::SharedLockScope _(m_Lock);
+ Buckets.reserve(m_Buckets.size());
for (auto& Kv : m_Buckets)
{
Buckets.push_back(&Kv.second);
@@ -1419,6 +2595,9 @@ ZenCacheDiskLayer::TotalSize() const
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
+}
+
+namespace zen {
using namespace std::literals;
@@ -1427,10 +2606,18 @@ namespace testutils {
IoBuffer CreateBinaryCacheValue(uint64_t Size)
{
- std::vector<uint32_t> Data(size_t(Size / sizeof(uint32_t)));
- std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; });
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
- IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t));
+ IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size());
Buf.SetContentType(ZenContentType::kBinary);
return Buf;
};
@@ -1737,6 +2924,7 @@ TEST_CASE("z$.gc")
GcCtx.MaxCacheDuration(std::chrono::minutes(2));
GcCtx.CollectSmallObjects(true);
+ Zcs.Flush();
Gc.CollectGarbage(GcCtx);
for (const auto& Key : Keys)
@@ -1751,6 +2939,357 @@ TEST_CASE("z$.gc")
}
}
+TEST_CASE("z$.legacyconversion")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[] = {2041,
+ 1123,
+ 1223,
+ 1239,
+ 341,
+ 1412,
+ 912,
+ 774,
+ 341,
+ 431,
+ 554,
+ 1098,
+ 2048,
+ 339 + 64 * 1024,
+ 561 + 64 * 1024,
+ 16 + 64 * 1024,
+ 16 + 64 * 1024,
+ 2048,
+ 2048};
+ size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
+ size_t SingleBlockSize = 0;
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(ChunkCount);
+ for (uint64_t Size : ChunkSizes)
+ {
+ Chunks.push_back(testutils::CreateBinaryCacheValue(Size));
+ SingleBlockSize += Size;
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunkCount);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ const std::string Bucket = "rightintwo";
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path());
+ const GcClock::TimePoint CurrentTime = GcClock::Now();
+
+ for (size_t i = 0; i < ChunkCount; i++)
+ {
+ Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]});
+ }
+
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcContext GcCtx(CurrentTime + std::chrono::hours(2));
+ GcCtx.MaxCacheDuration(std::chrono::minutes(2));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepChunks);
+ Zcs.Flush();
+ Gc.CollectGarbage(GcCtx);
+ }
+ std::filesystem::path BucketDir = TempDir.Path() / Bucket;
+ std::filesystem::path BlocksBaseDir = BucketDir / "blocks";
+
+ std::filesystem::path CasPath = GetBlockPath(BlocksBaseDir, 1);
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir);
+ std::filesystem::remove(LegacyDataPath);
+ std::filesystem::rename(CasPath, LegacyDataPath);
+
+ std::vector<DiskIndexEntry> LogEntries;
+ std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion &&
+ Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
+ {
+ LogEntries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ }
+ }
+ ObjectIndexFile.Close();
+ std::filesystem::remove(IndexPath);
+ }
+
+ std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket);
+ {
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ LogEntries.reserve(CasLog.GetLogCount());
+ CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
+ }
+ TCasLogFile<LegacyDiskIndexEntry> LegacyLog;
+ std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir);
+ LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
+
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ uint64_t Size;
+ uint64_t Offset;
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ Size = Entry.Location.Location.StandaloneSize;
+ Offset = 0;
+ }
+ else
+ {
+ BlockStoreLocation Location = Entry.Location.GetBlockLocation(16);
+ Size = Location.Size;
+ Offset = Location.Offset;
+ }
+ LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56);
+ LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation};
+ LegacyLog.Append(LegacyEntry);
+ }
+ LegacyLog.Close();
+
+ std::filesystem::remove_all(BlocksBaseDir);
+ std::filesystem::remove(LogPath);
+ std::filesystem::remove(IndexPath);
+
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path());
+
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ ZenCacheValue Value;
+ CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value));
+ CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value));
+ }
+ }
+}
+
+# if 0
+TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(kChunkCount);
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ ChunkHashes.emplace_back(Hash);
+ Chunks.emplace_back(Chunk);
+ }
+
+ WorkerThreadPool ThreadPool(4);
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 32768, 16, true);
+ {
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ const IoBuffer& Chunk = Chunks[Idx];
+ const IoHash& Hash = ChunkHashes[Idx];
+ ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+
+ {
+ std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() {
+ IoHash ChunkHash = OldChunkHashes[Idx];
+ IoBuffer Chunk = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ CHECK(ChunkHash == Hash);
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
+
+ std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ {
+ std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ std::vector<IoHash> NewChunkHashes;
+ NewChunkHashes.reserve(kChunkCount);
+ std::vector<IoBuffer> NewChunks;
+ NewChunks.reserve(kChunkCount);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunkHashes.emplace_back(Hash);
+ NewChunks.emplace_back(Chunk);
+ }
+
+ RwLock ChunkHashesLock;
+ std::atomic_uint32_t AddedChunkCount;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ const IoBuffer& Chunk = NewChunks[Idx];
+ const IoHash& Hash = NewChunkHashes[Idx];
+ ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ AddedChunkCount.fetch_add(1);
+ });
+ ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() {
+ IoHash ChunkHash = OldChunkHashes[Idx];
+ IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]);
+ if (Chunk)
+ {
+ CHECK(ChunkHash == IoHash::HashBuffer(Chunk));
+ }
+ });
+ }
+
+ while (AddedChunkCount.load() < kChunkCount)
+ {
+ std::vector<IoHash> AddedHashes;
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ AddedHashes.swap(NewChunkHashes);
+ }
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const IoHash& ChunkHash : AddedHashes)
+ {
+ if (Cas.HaveChunk(ChunkHash))
+ {
+ GcChunkHashes.emplace(ChunkHash);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+
+ {
+ std::vector<IoHash> AddedHashes;
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ AddedHashes.swap(NewChunkHashes);
+ }
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const IoHash& ChunkHash : AddedHashes)
+ {
+ if (Cas.HaveChunk(ChunkHash))
+ {
+ GcChunkHashes.emplace(ChunkHash);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 77 == 0 && C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ for (const IoHash& ChunkHash : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Cas, ChunkHash]() {
+ CHECK(Cas.HaveChunk(ChunkHash));
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
+ }
+}
+# endif
+
#endif
void