aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-12-07 11:06:22 +0100
committerPer Larsson <[email protected]>2021-12-07 11:06:22 +0100
commitfdcd769a821adb03b3cf79bb873fe46dfeb02e55 (patch)
treec1fea6b5a9a94403226bc43874a4c65778ddb5a9 /zenserver/cache/structuredcachestore.cpp
parentFixed bug in container GC. (diff)
downloadzen-fdcd769a821adb03b3cf79bb873fe46dfeb02e55.tar.xz
zen-fdcd769a821adb03b3cf79bb873fe46dfeb02e55.zip
Added support for time based eviction policy in structured cache.
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp324
1 files changed, 213 insertions, 111 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 62c59a0ef..2b213b9b0 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -4,7 +4,6 @@
#include "cachetracking.h"
-#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
@@ -12,7 +11,6 @@
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
-#include <zencore/iobuffer.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
@@ -21,16 +19,13 @@
#include <zencore/thread.h>
#include <zencore/windows.h>
#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
-#include <zenstore/gc.h>
+#include <chrono>
#include <concepts>
-#include <filesystem>
#include <memory_resource>
#include <ranges>
-#include <unordered_map>
ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/core.h>
@@ -43,6 +38,29 @@ namespace zen {
using namespace fmt::literals;
+static CbObject
+LoadCompactBinaryObject(const std::filesystem::path& Path)
+{
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
+ {
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return LoadCompactBinaryObject(Buffer);
+ }
+ }
+
+ return CbObject();
+}
+
+static void
+SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object)
+{
+ WriteFile(Path, Object.GetBuffer().AsIoBuffer());
+}
+
ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcContributor(Gc), m_DiskLayer(RootDir)
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
@@ -310,25 +328,20 @@ ZenCacheMemoryLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
RwLock::SharedLockScope _(m_bucketLock);
- std::vector<IoHash> BadHashes;
+ std::vector<IoHash> Cids;
for (const auto& Kv : m_cacheMap)
{
const IoBuffer& Payload = Kv.second.Payload;
-
- switch (Payload.GetContentType())
+ if (Payload.GetContentType() != ZenContentType::kCbObject || GcCtx.Expired(Kv.second.LastAccess))
{
- case ZenContentType::kCbObject:
- {
- CbObject Obj(SharedBuffer{Payload});
-
- Obj.IterateAttachments([&](CbFieldView Field) { GcCtx.ContributeCids(std::array<IoHash, 1>{Field.AsAttachment()}); });
- }
- break;
-
- case ZenContentType::kBinary:
- break;
+ continue;
}
+
+ Cids.clear();
+ CbObject Obj(SharedBuffer{Payload});
+ Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
+ GcCtx.ContributeCids(Cids);
}
}
@@ -345,24 +358,18 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
{
BucketValue& Value = bucketIt.value();
OutValue.Value = Value.Payload;
- Value.LastAccess = GetCurrentTimeStamp();
+ Value.LastAccess = GcClock::TickCount();
return true;
}
}
-uint64_t
-ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp()
-{
- return GetLofreqTimerValue();
-}
-
void
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
{
RwLock::ExclusiveLockScope _(m_bucketLock);
- m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value});
+ m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GcClock::TickCount(), .Payload = Value.Value});
}
m_TotalSize.fetch_add(Value.Value.GetSize());
@@ -452,10 +459,16 @@ private:
BasicFile m_SobsFile;
TCasLogFile<DiskIndexEntry> m_SlogFile;
- RwLock m_IndexLock;
- tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
- uint64_t m_WriteCursor = 0;
- std::atomic_uint64_t m_TotalSize{};
+ struct IndexEntry
+ {
+ DiskLocation Location;
+ GcClock::Tick LastAccess{};
+ };
+
+ RwLock m_IndexLock;
+ tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher> m_Index;
+ uint64_t m_WriteCursor = 0;
+ std::atomic_uint64_t m_TotalSize{};
void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
@@ -497,6 +510,8 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir)
void
ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
{
+ using namespace std::literals;
+
CreateDirectories(BucketDir);
m_BucketDir = BucketDir;
@@ -505,55 +520,29 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"};
std::filesystem::path SlogPath{m_BucketDir / "zen.slog"};
- BasicFile ManifestFile;
-
- // Try opening existing manifest file first
-
bool IsNew = false;
- std::error_code Ec;
- ManifestFile.Open(ManifestPath, /* IsCreate */ false, Ec);
+ CbObject Manifest = LoadCompactBinaryObject(ManifestPath);
- if (!Ec)
+ if (Manifest)
{
- uint64_t FileSize = ManifestFile.FileSize();
-
- if (FileSize == sizeof(Oid))
- {
- ManifestFile.Read(&m_BucketId, sizeof(Oid), 0);
-
- m_IsOk = true;
- }
-
- if (!m_IsOk)
- {
- ManifestFile.Close();
- }
+ m_BucketId = Manifest["BucketId"].AsObjectId();
+ m_IsOk = m_BucketId != Oid::Zero;
}
-
- if (!m_IsOk)
+ else if (AllowCreate)
{
- if (AllowCreate == false)
- {
- // Invalid bucket
- return;
- }
-
- // No manifest file found, this is a new bucket
-
- ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec);
-
- if (Ec)
- {
- throw std::system_error(Ec, "Failed to create bucket manifest '{}'"_format(ManifestPath));
- }
-
m_BucketId.Generate();
- ManifestFile.Write(&m_BucketId, sizeof(Oid), /* FileOffset */ 0);
-
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << m_BucketId;
+ Manifest = Writer.Save();
+ SaveCompactBinaryObject(ManifestPath, Manifest);
IsNew = true;
}
+ else
+ {
+ return;
+ }
// Initialize small object storage related files
@@ -579,7 +568,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
else
{
- m_Index[Entry.Key] = Entry.Location;
+ m_Index[Entry.Key] = {.Location = Entry.Location, .LastAccess = GcClock::TickCount()};
m_TotalSize.fetch_add(Entry.Location.Size());
}
MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
@@ -593,6 +582,17 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
m_WriteCursor = (MaxFileOffset + 15) & ~15;
}
+ for (CbFieldView Entry : Manifest["Timestamps"])
+ {
+ const CbObjectView Obj = Entry.AsObjectView();
+ const IoHash Key = Obj["Key"sv].AsHash();
+
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ It.value().LastAccess = Obj["LastAccess"sv].AsInt64();
+ }
+ }
+
m_IsOk = true;
}
@@ -654,18 +654,19 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
RwLock::SharedLockScope _(m_IndexLock);
- if (auto it = m_Index.find(HashKey); it != m_Index.end())
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
- const DiskLocation& Loc = it->second;
+ IndexEntry& Entry = It.value();
+ Entry.LastAccess = GcClock::TickCount();
- if (GetInlineCacheValue(Loc, OutValue))
+ if (GetInlineCacheValue(Entry.Location, OutValue))
{
return true;
}
_.ReleaseNow();
- return GetStandaloneCacheValue(Loc, HashKey, OutValue);
+ return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue);
}
return false;
@@ -700,17 +701,19 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16);
- if (auto it = m_Index.find(HashKey); it == m_Index.end())
+ if (auto It = m_Index.find(HashKey); It == m_Index.end())
{
// Previously unknown object
- m_Index.insert({HashKey, Loc});
+ m_Index.insert({HashKey, {Loc, GcClock::TickCount()}});
}
else
{
// TODO: should check if write is idempotent and bail out if it is?
// this would requiring comparing contents on disk unless we add a
// content hash to the index entry
- it.value() = Loc;
+ IndexEntry& Entry = It.value();
+ Entry.Location = Loc;
+ Entry.LastAccess = GcClock::TickCount();
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
@@ -732,8 +735,32 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
+ using namespace std::literals;
+
+ RwLock::SharedLockScope _(m_IndexLock);
+
m_SobsFile.Flush();
m_SlogFile.Flush();
+
+ // Update manifest
+ {
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << m_BucketId;
+
+ if (!m_Index.empty())
+ {
+ Writer.BeginArray("Timestamps"sv);
+ for (auto& Kv : m_Index)
+ {
+ const IoHash& Key = Kv.first;
+ const IndexEntry& Entry = Kv.second;
+ Writer << "Key"sv << Key << "LastAccess"sv << Entry.LastAccess;
+ }
+ Writer.EndArray();
+ }
+
+ SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save());
+ }
}
void
@@ -747,7 +774,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
for (auto& Kv : m_Index)
{
const IoHash& HashKey = Kv.first;
- const DiskLocation& Loc = Kv.second;
+ const DiskLocation& Loc = Kv.second.Location;
ZenCacheValue Value;
@@ -782,7 +809,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
// Log a tombstone and delete the in-memory index for the bad entry
const auto It = m_Index.find(BadKey);
- const DiskLocation& Location = It->second;
+ const DiskLocation& Location = It->second.Location;
m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}});
m_Index.erase(BadKey);
}
@@ -794,41 +821,34 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_IndexLock);
+ std::vector<IoHash> Cids;
+
for (auto& Kv : m_Index)
{
const IoHash& HashKey = Kv.first;
- const DiskLocation& Loc = Kv.second;
+ const DiskLocation& Loc = Kv.second.Location;
- if (Loc.IsFlagSet(DiskLocation::kStructured) == false)
+ if (Loc.IsFlagSet(DiskLocation::kStructured) == false || Loc.IsFlagSet(DiskLocation::kTombStone) ||
+ GcCtx.Expired(Kv.second.LastAccess))
{
continue;
}
- ZenCacheValue CacheValue;
- std::vector<IoHash> Attachments;
+ ZenCacheValue CacheValue;
+ if (!GetInlineCacheValue(Loc, CacheValue))
+ {
+ GetStandaloneCacheValue(Loc, HashKey, CacheValue);
+ }
- auto GatherRefs = [&] {
+ if (CacheValue.Value)
+ {
ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
+ Cids.clear();
CbObject Obj(SharedBuffer{CacheValue.Value});
- Obj.IterateAttachments([&](CbFieldView Field) { Attachments.push_back(Field.AsAttachment()); });
- GcCtx.ContributeCids(Attachments);
- };
-
- if (GetInlineCacheValue(Loc, /* out */ CacheValue))
- {
- GatherRefs();
- }
- else if (GetStandaloneCacheValue(Loc, HashKey, /* out */ CacheValue))
- {
- GatherRefs();
+ Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
+ GcCtx.ContributeCids(Cids);
}
- else
- {
- // Value not found
- }
-
- GcCtx.ContributeCids(Attachments);
}
}
@@ -910,16 +930,17 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
RwLock::ExclusiveLockScope _(m_IndexLock);
DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
+ IndexEntry Entry = IndexEntry{.Location = Loc, .LastAccess = GcClock::TickCount()};
- if (auto it = m_Index.find(HashKey); it == m_Index.end())
+ if (auto It = m_Index.find(HashKey); It == m_Index.end())
{
// Previously unknown object
- m_Index.insert({HashKey, Loc});
+ m_Index.insert({HashKey, Entry});
}
else
{
// TODO: should check if write is idempotent and bail out if it is?
- it.value() = Loc;
+ It.value() = Entry;
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
@@ -1169,11 +1190,17 @@ ZenCacheDiskLayer::TotalSize() const
#if ZEN_WITH_TESTS
+using namespace std::literals;
+using namespace fmt::literals;
+
+namespace testutils {
+
+ IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); }
+
+} // namespace testutils
+
TEST_CASE("zcache.store")
{
- using namespace fmt::literals;
- using namespace std::literals;
-
ScopedTemporaryDirectory TempDir;
CasGc Gc;
@@ -1214,9 +1241,6 @@ TEST_CASE("zcache.store")
TEST_CASE("zcache.size")
{
- using namespace fmt::literals;
- using namespace std::literals;
-
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
Buf.resize(Size);
@@ -1313,6 +1337,84 @@ TEST_CASE("zcache.size")
}
}
+TEST_CASE("zcache.gc")
+{
+ using namespace testutils;
+
+ SUBCASE("gather references does NOT add references for expired cache entries")
+ {
+ ScopedTemporaryDirectory TempDir;
+ std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ const auto CollectAndFilter = [](CasGc& Gc,
+ GcClock::TimePoint Time,
+ GcClock::Duration MaxDuration,
+ std::span<const IoHash> Cids,
+ std::vector<IoHash>& OutKeep) {
+ GcContext GcCtx(Time);
+ GcCtx.MaxCacheDuration(MaxDuration);
+ Gc.CollectGarbage(GcCtx);
+ OutKeep.clear();
+ GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); });
+ };
+
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ const auto Bucket = "teardrinker"sv;
+
+ // Create a cache record
+ const IoHash Key = CreateKey(42);
+ CbObjectWriter Record;
+ Record << "Key"sv
+ << "SomeRecord"sv;
+
+ for (size_t Idx = 0; auto& Cid : Cids)
+ {
+ Record.AddBinaryAttachment("attachment-{}"_format(Idx++), Cid);
+ }
+
+ IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ Zcs.Put(Bucket, Key, {.Value = Buffer});
+
+ std::vector<IoHash> Keep;
+
+ // Collect garbage with 1 hour max cache duration
+ {
+ CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(Cids.size(), Keep.size());
+ }
+
+ // Move forward in time
+ {
+ CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(0, Keep.size());
+ }
+ }
+
+ // Expect timestamps to be serialized
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ std::vector<IoHash> Keep;
+
+ // Collect garbage with 1 hour max cache duration
+ {
+ CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(3, Keep.size());
+ }
+
+ // Move forward in time
+ {
+ CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(0, Keep.size());
+ }
+ }
+ }
+}
+
#endif
void