diff options
| author | Per Larsson <[email protected]> | 2021-12-07 11:06:22 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-07 11:06:22 +0100 |
| commit | fdcd769a821adb03b3cf79bb873fe46dfeb02e55 (patch) | |
| tree | c1fea6b5a9a94403226bc43874a4c65778ddb5a9 /zenserver/cache/structuredcachestore.cpp | |
| parent | Fixed bug in container GC. (diff) | |
| download | zen-fdcd769a821adb03b3cf79bb873fe46dfeb02e55.tar.xz zen-fdcd769a821adb03b3cf79bb873fe46dfeb02e55.zip | |
Added support for time based eviction policy in structured cache.
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 324 |
1 files changed, 213 insertions, 111 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 62c59a0ef..2b213b9b0 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -4,7 +4,6 @@ #include "cachetracking.h" -#include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> @@ -12,7 +11,6 @@ #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> -#include <zencore/iobuffer.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/string.h> @@ -21,16 +19,13 @@ #include <zencore/thread.h> #include <zencore/windows.h> #include <zenstore/basicfile.h> -#include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/cidstore.h> -#include <zenstore/gc.h> +#include <chrono> #include <concepts> -#include <filesystem> #include <memory_resource> #include <ranges> -#include <unordered_map> ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/core.h> @@ -43,6 +38,29 @@ namespace zen { using namespace fmt::literals; +static CbObject +LoadCompactBinaryObject(const std::filesystem::path& Path) +{ + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); +} + +static void +SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object) +{ + WriteFile(Path, Object.GetBuffer().AsIoBuffer()); +} + ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcContributor(Gc), m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); @@ -310,25 +328,20 @@ ZenCacheMemoryLayer::CacheBucket::GatherReferences(GcContext& GcCtx) RwLock::SharedLockScope _(m_bucketLock); - std::vector<IoHash> BadHashes; + std::vector<IoHash> Cids; for (const auto& Kv : m_cacheMap) { const IoBuffer& Payload = Kv.second.Payload; - - switch (Payload.GetContentType()) + if (Payload.GetContentType() != ZenContentType::kCbObject || GcCtx.Expired(Kv.second.LastAccess)) { - case ZenContentType::kCbObject: - { - CbObject Obj(SharedBuffer{Payload}); - - Obj.IterateAttachments([&](CbFieldView Field) { GcCtx.ContributeCids(std::array<IoHash, 1>{Field.AsAttachment()}); }); - } - break; - - case ZenContentType::kBinary: - break; + continue; } + + Cids.clear(); + CbObject Obj(SharedBuffer{Payload}); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); + GcCtx.ContributeCids(Cids); } } @@ -345,24 +358,18 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV { BucketValue& Value = bucketIt.value(); OutValue.Value = Value.Payload; - Value.LastAccess = GetCurrentTimeStamp(); + Value.LastAccess = GcClock::TickCount(); return true; } } -uint64_t -ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp() -{ - return GetLofreqTimerValue(); -} - void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { { RwLock::ExclusiveLockScope _(m_bucketLock); - m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value}); + m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GcClock::TickCount(), .Payload = Value.Value}); } m_TotalSize.fetch_add(Value.Value.GetSize()); @@ -452,10 +459,16 @@ private: BasicFile m_SobsFile; TCasLogFile<DiskIndexEntry> m_SlogFile; - RwLock m_IndexLock; - tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; - uint64_t m_WriteCursor = 0; - std::atomic_uint64_t m_TotalSize{}; + struct IndexEntry + { + DiskLocation Location; + GcClock::Tick LastAccess{}; + }; + + RwLock m_IndexLock; + tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; + std::atomic_uint64_t m_TotalSize{}; void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); @@ -497,6 +510,8 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) void ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { + using namespace std::literals; + CreateDirectories(BucketDir); m_BucketDir = BucketDir; @@ -505,55 +520,29 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; - BasicFile ManifestFile; - - // Try opening existing manifest file first - bool IsNew = false; - std::error_code Ec; - ManifestFile.Open(ManifestPath, /* IsCreate */ false, Ec); + CbObject Manifest = LoadCompactBinaryObject(ManifestPath); - if (!Ec) + if (Manifest) { - uint64_t FileSize = ManifestFile.FileSize(); - - if (FileSize == sizeof(Oid)) - { - ManifestFile.Read(&m_BucketId, sizeof(Oid), 0); - - m_IsOk = true; - } - - if (!m_IsOk) - { - ManifestFile.Close(); - } + m_BucketId = Manifest["BucketId"].AsObjectId(); + m_IsOk = m_BucketId != Oid::Zero; } - - if (!m_IsOk) + else if (AllowCreate) { - if (AllowCreate == false) - { - // Invalid bucket - return; - } - - // No manifest file found, this is a new bucket - - ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec); - - if (Ec) - { - throw std::system_error(Ec, "Failed to create bucket manifest '{}'"_format(ManifestPath)); - } - m_BucketId.Generate(); - ManifestFile.Write(&m_BucketId, sizeof(Oid), /* FileOffset */ 0); - + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; + Manifest = Writer.Save(); + SaveCompactBinaryObject(ManifestPath, Manifest); IsNew = true; } + else + { + return; + } // Initialize small object storage related files @@ -579,7 +568,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } else { - m_Index[Entry.Key] = Entry.Location; + m_Index[Entry.Key] = {.Location = Entry.Location, .LastAccess = GcClock::TickCount()}; m_TotalSize.fetch_add(Entry.Location.Size()); } MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); @@ -593,6 +582,17 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo m_WriteCursor = (MaxFileOffset + 15) & ~15; } + for (CbFieldView Entry : Manifest["Timestamps"]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); + + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + It.value().LastAccess = Obj["LastAccess"sv].AsInt64(); + } + } + m_IsOk = true; } @@ -654,18 +654,19 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal RwLock::SharedLockScope _(m_IndexLock); - if (auto it = m_Index.find(HashKey); it != m_Index.end()) + if (auto It = m_Index.find(HashKey); It != m_Index.end()) { - const DiskLocation& Loc = it->second; + IndexEntry& Entry = It.value(); + Entry.LastAccess = GcClock::TickCount(); - if (GetInlineCacheValue(Loc, OutValue)) + if (GetInlineCacheValue(Entry.Location, OutValue)) { return true; } _.ReleaseNow(); - return GetStandaloneCacheValue(Loc, HashKey, OutValue); + return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue); } return false; @@ -700,17 +701,19 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16); - if (auto it = m_Index.find(HashKey); it == m_Index.end()) + if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object - m_Index.insert({HashKey, Loc}); + m_Index.insert({HashKey, {Loc, GcClock::TickCount()}}); } else { // TODO: should check if write is idempotent and bail out if it is? // this would requiring comparing contents on disk unless we add a // content hash to the index entry - it.value() = Loc; + IndexEntry& Entry = It.value(); + Entry.Location = Loc; + Entry.LastAccess = GcClock::TickCount(); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); @@ -732,8 +735,32 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { + using namespace std::literals; + + RwLock::SharedLockScope _(m_IndexLock); + m_SobsFile.Flush(); m_SlogFile.Flush(); + + // Update manifest + { + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; + + if (!m_Index.empty()) + { + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : m_Index) + { + const IoHash& Key = Kv.first; + const IndexEntry& Entry = Kv.second; + Writer << "Key"sv << Key << "LastAccess"sv << Entry.LastAccess; + } + Writer.EndArray(); + } + + SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); + } } void @@ -747,7 +774,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; - const DiskLocation& Loc = Kv.second; + const DiskLocation& Loc = Kv.second.Location; ZenCacheValue Value; @@ -782,7 +809,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); - const DiskLocation& Location = It->second; + const DiskLocation& Location = It->second.Location; m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}}); m_Index.erase(BadKey); } @@ -794,41 +821,34 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_IndexLock); + std::vector<IoHash> Cids; + for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; - const DiskLocation& Loc = Kv.second; + const DiskLocation& Loc = Kv.second.Location; - if (Loc.IsFlagSet(DiskLocation::kStructured) == false) + if (Loc.IsFlagSet(DiskLocation::kStructured) == false || Loc.IsFlagSet(DiskLocation::kTombStone) || + GcCtx.Expired(Kv.second.LastAccess)) { continue; } - ZenCacheValue CacheValue; - std::vector<IoHash> Attachments; + ZenCacheValue CacheValue; + if (!GetInlineCacheValue(Loc, CacheValue)) + { + GetStandaloneCacheValue(Loc, HashKey, CacheValue); + } - auto GatherRefs = [&] { + if (CacheValue.Value) + { ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); + Cids.clear(); CbObject Obj(SharedBuffer{CacheValue.Value}); - Obj.IterateAttachments([&](CbFieldView Field) { Attachments.push_back(Field.AsAttachment()); }); - GcCtx.ContributeCids(Attachments); - }; - - if (GetInlineCacheValue(Loc, /* out */ CacheValue)) - { - GatherRefs(); - } - else if (GetStandaloneCacheValue(Loc, HashKey, /* out */ CacheValue)) - { - GatherRefs(); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); + GcCtx.ContributeCids(Cids); } - else - { - // Value not found - } - - GcCtx.ContributeCids(Attachments); } } @@ -910,16 +930,17 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); + IndexEntry Entry = IndexEntry{.Location = Loc, .LastAccess = GcClock::TickCount()}; - if (auto it = m_Index.find(HashKey); it == m_Index.end()) + if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object - m_Index.insert({HashKey, Loc}); + m_Index.insert({HashKey, Entry}); } else { // TODO: should check if write is idempotent and bail out if it is? - it.value() = Loc; + It.value() = Entry; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); @@ -1169,11 +1190,17 @@ ZenCacheDiskLayer::TotalSize() const #if ZEN_WITH_TESTS +using namespace std::literals; +using namespace fmt::literals; + +namespace testutils { + + IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); } + +} // namespace testutils + TEST_CASE("zcache.store") { - using namespace fmt::literals; - using namespace std::literals; - ScopedTemporaryDirectory TempDir; CasGc Gc; @@ -1214,9 +1241,6 @@ TEST_CASE("zcache.store") TEST_CASE("zcache.size") { - using namespace fmt::literals; - using namespace std::literals; - const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector<uint8_t> Buf; Buf.resize(Size); @@ -1313,6 +1337,84 @@ TEST_CASE("zcache.size") } } +TEST_CASE("zcache.gc") +{ + using namespace testutils; + + SUBCASE("gather references does NOT add references for expired cache entries") + { + ScopedTemporaryDirectory TempDir; + std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; + + const auto CollectAndFilter = [](CasGc& Gc, + GcClock::TimePoint Time, + GcClock::Duration MaxDuration, + std::span<const IoHash> Cids, + std::vector<IoHash>& OutKeep) { + GcContext GcCtx(Time); + GcCtx.MaxCacheDuration(MaxDuration); + Gc.CollectGarbage(GcCtx); + OutKeep.clear(); + GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); }); + }; + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "teardrinker"sv; + + // Create a cache record + const IoHash Key = CreateKey(42); + CbObjectWriter Record; + Record << "Key"sv + << "SomeRecord"sv; + + for (size_t Idx = 0; auto& Cid : Cids) + { + Record.AddBinaryAttachment("attachment-{}"_format(Idx++), Cid); + } + + IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + Zcs.Put(Bucket, Key, {.Value = Buffer}); + + std::vector<IoHash> Keep; + + // Collect garbage with 1 hour max cache duration + { + CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(Cids.size(), Keep.size()); + } + + // Move forward in time + { + CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(0, Keep.size()); + } + } + + // Expect timestamps to be serialized + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + std::vector<IoHash> Keep; + + // Collect garbage with 1 hour max cache duration + { + CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(3, Keep.size()); + } + + // Move forward in time + { + CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(0, Keep.size()); + } + } + } +} + #endif void |