aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-12-13 11:43:29 +0100
committerPer Larsson <[email protected]>2021-12-13 11:43:29 +0100
commit7d8f6c99372c6157cc4db02d7c249985c789fc7d (patch)
tree1a5527c9431d9382e0c48a06a626928c3bd745b5 /zenserver/cache/structuredcachestore.cpp
parentAdded support for triggering GC with different params and refactored GC sched... (diff)
downloadzen-7d8f6c99372c6157cc4db02d7c249985c789fc7d.tar.xz
zen-7d8f6c99372c6157cc4db02d7c249985c789fc7d.zip
Refactored z$ GC.
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp291
1 files changed, 163 insertions, 128 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index f74bb05c1..030588659 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -189,14 +189,17 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
void
ZenCacheStore::GatherReferences(GcContext& GcCtx)
{
- m_MemLayer.GatherReferences(GcCtx);
+ access_tracking::AccessTimes AccessTimes;
+ m_MemLayer.GatherAccessTimes(AccessTimes);
+
+ m_DiskLayer.UpdateAccessTimes(AccessTimes);
m_DiskLayer.GatherReferences(GcCtx);
}
void
ZenCacheStore::CollectGarbage(GcContext& GcCtx)
{
- m_MemLayer.CollectGarbage(GcCtx);
+ m_MemLayer.Reset();
m_DiskLayer.CollectGarbage(GcCtx);
}
@@ -289,22 +292,22 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheMemoryLayer::GatherReferences(GcContext& GcCtx)
+ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes)
{
+ using namespace zen::access_tracking;
+
RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
- Kv.second.GatherReferences(GcCtx);
+ std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first];
+ Kv.second.GatherAccessTimes(Bucket);
}
}
void
-ZenCacheMemoryLayer::CollectGarbage(GcContext& GcCtx)
+ZenCacheMemoryLayer::Reset()
{
- ZEN_UNUSED(GcCtx);
-
- // Just drop everything for now
RwLock::ExclusiveLockScope _(m_Lock);
m_Buckets.clear();
}
@@ -345,28 +348,12 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheMemoryLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
+ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
{
- // Is it even meaningful to do this? The memory layer shouldn't
- // contain anything which is not already in the disk layer
-
RwLock::SharedLockScope _(m_bucketLock);
-
- std::vector<IoHash> Cids;
-
- for (const auto& Kv : m_cacheMap)
- {
- const IoBuffer& Payload = Kv.second.Payload;
- if (Payload.GetContentType() != ZenContentType::kCbObject || GcCtx.Expired(Kv.second.LastAccess))
- {
- continue;
- }
-
- Cids.clear();
- CbObject Obj(SharedBuffer{Payload});
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
- GcCtx.ContributeCids(Cids);
- }
+ std::transform(m_cacheMap.begin(), m_cacheMap.end(), std::back_inserter(AccessTimes), [](const auto& Kv) {
+ return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = Kv.second.LastAccess};
+ });
}
bool
@@ -470,6 +457,7 @@ struct ZenCacheDiskLayer::CacheBucket
void Scrub(ScrubContext& Ctx);
void GatherReferences(GcContext& GcCtx);
void CollectGarbage(GcContext& GcCtx);
+ void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
inline bool IsOk() const { return m_IsOk; }
inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
@@ -690,6 +678,7 @@ ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& L
{
m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}});
m_Index.erase(HashKey);
+ m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
}
}
@@ -873,41 +862,61 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
void
ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
- RwLock::SharedLockScope _(m_IndexLock);
+ const GcClock::TimePoint ExpireTime =
+ GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration();
- std::vector<IoHash> Cids;
+ const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
- for (auto& Kv : m_Index)
- {
- const IoHash& HashKey = Kv.first;
- const DiskLocation& Loc = Kv.second.Location;
+ RwLock::ExclusiveLockScope _(m_IndexLock);
- if (!Loc.IsFlagSet(DiskLocation::kStructured) || Loc.IsFlagSet(DiskLocation::kTombStone))
- {
- continue;
- }
+ std::vector<IoHash> ValidKeys;
+ std::vector<IoHash> ExpiredKeys;
+ std::vector<IoHash> Cids;
+ std::vector<IndexMap::value_type> Entries(m_Index.begin(), m_Index.end());
- if (GcCtx.Expired(Kv.second.LastAccess))
- {
- continue;
- }
+ std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) {
+ return LHS.second.LastAccess < RHS.second.LastAccess;
+ });
- ZenCacheValue CacheValue;
- if (!GetInlineCacheValue(Loc, CacheValue))
- {
- GetStandaloneCacheValue(Loc, HashKey, CacheValue);
- }
+ const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) {
+ const IndexEntry& Entry = Kv.second;
+ return Entry.LastAccess < Ticks;
+ });
- if (CacheValue.Value)
+ Cids.reserve(Entries.size());
+
+ for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv)
+ {
+ const IoHash& Key = Kv->first;
+ const DiskLocation& Loc = Kv->second.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStructured))
{
- ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
+ ZenCacheValue CacheValue;
+ if (!GetInlineCacheValue(Loc, CacheValue))
+ {
+ GetStandaloneCacheValue(Loc, Key, CacheValue);
+ }
+
+ if (CacheValue.Value)
+ {
+ ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
- Cids.clear();
- CbObject Obj(SharedBuffer{CacheValue.Value});
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
- GcCtx.ContributeCids(Cids);
+ CbObject Obj(SharedBuffer{CacheValue.Value});
+ Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
+ GcCtx.ContributeCids(Cids);
+ }
}
}
+
+ ValidKeys.reserve(std::distance(ValidIt, Entries.end()));
+ ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt));
+
+ std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; });
+ std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; });
+
+ GcCtx.ContributeCids(Cids);
+ GcCtx.ContributeCacheKeys(std::move(ValidKeys), std::move(ExpiredKeys));
}
void
@@ -919,14 +928,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
const uint64_t OldCount = m_Index.size();
const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
- uint64_t NewCount{};
ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir);
Stopwatch Timer;
- const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize, &NewCount] {
+ const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] {
+ const uint64_t NewCount = m_Index.size();
const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed);
- ZEN_INFO("garbage collect from '{}' DONE after {}, collected #{} {} chunks of total #{} {}",
+ ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})",
m_BucketDir,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
OldCount - NewCount,
@@ -941,56 +950,40 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
return;
}
- struct Candidate
- {
- IoHash Key;
- DiskLocation Loc;
- GcClock::Tick LastAccess;
-
- bool operator<(const Candidate& RHS) const { return LastAccess < RHS.LastAccess; }
+ auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) {
+ for (const IoHash& Key : Keys)
+ {
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ OutEntries.push_back(*It);
+ }
+ }
};
- std::vector<Candidate> Candidates;
- Candidates.reserve(m_Index.size());
-
- for (auto& Kv : m_Index)
- {
- Candidates.push_back({.Key = Kv.first, .Loc = Kv.second.Location, .LastAccess = Kv.second.LastAccess});
- }
-
- const GcClock::TimePoint Tp = GcCtx.Time() - GcCtx.MaxCacheDuration();
- const GcClock::Tick TicksAllowed = Tp.time_since_epoch().count();
-
- std::sort(Candidates.begin(), Candidates.end());
- const auto ValidIt = std::lower_bound(Candidates.begin(), Candidates.end(), TicksAllowed, [](const auto& C, auto Ticks) {
- return C.LastAccess < Ticks;
- });
-
- const size_t ValidIndex = std::distance(Candidates.begin(), ValidIt);
- NewCount = std::distance(ValidIt, Candidates.end());
+ std::vector<IndexMap::value_type> ValidEntries;
+ std::vector<IndexMap::value_type> ExpiredEntries;
- if (NewCount == OldCount)
- {
- return;
- }
-
- const std::span<Candidate const> ValidEntries(Candidates.begin() + ValidIndex, NewCount);
- const std::span<Candidate const> ExpiredEntries(Candidates.begin(), Candidates.size() - NewCount);
+ AddEntries(GcCtx.ValidCacheKeys(), ValidEntries);
+ AddEntries(GcCtx.ExpiredCacheKeys(), ExpiredEntries);
// Remove all standalone file(s)
+ // NOTE: This can probably be made asynchronously
{
std::error_code Ec;
PathBuilder Path;
for (const auto& Entry : ExpiredEntries)
{
- if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ const IoHash& Key = Entry.first;
+ const DiskLocation& Loc = Entry.second.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
Path.Reset();
- BuildPath(Path, Entry.Key);
+ BuildPath(Path, Key);
// NOTE: this will update index and log file
- DeleteStandaloneCacheValue(Entry.Loc, Entry.Key, Path.c_str(), Ec);
+ DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec);
if (Ec)
{
@@ -1003,9 +996,9 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty())
{
- // Super naive GC implementation for small objects. Needs enough free
- // disk space to store intermediate sobs files along side the
- // old files
+ // Naive GC implementation of small objects. Needs enough free
+ // disk space to store intermediate sob container along side the
+ // old container
const auto ResetSobStorage = [this, &ValidEntries]() {
m_SobsFile.Close();
@@ -1021,11 +1014,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
for (const auto& Entry : ValidEntries)
{
- if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ const IoHash& Key = Entry.first;
+ const DiskLocation& Loc = Entry.second.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- m_SlogFile.Append({.Key = Entry.Key, .Location = Entry.Loc});
- m_Index.insert({Entry.Key, {Entry.Loc, GcClock::TickCount()}});
- m_TotalSize.fetch_add(Entry.Loc.Size(), std::memory_order::relaxed);
+ m_SlogFile.Append({.Key = Key, .Location = Loc});
+ m_Index.insert({Key, {Loc, GcClock::TickCount()}});
+ m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
}
}
};
@@ -1033,9 +1029,11 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
uint64_t NewContainerSize{};
for (const auto& Entry : ValidEntries)
{
- if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false)
+ const DiskLocation& Loc = Entry.second.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false)
{
- NewContainerSize += (Entry.Loc.Size() + sizeof(DiskLocation));
+ NewContainerSize += (Loc.Size() + sizeof(DiskLocation));
}
}
@@ -1061,7 +1059,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"};
std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"};
- // Copy non expired sob(s) to temporary file(s)
+ // Copy non expired sob(s) to temporary sob container
{
BasicFile TmpSobs;
@@ -1074,23 +1072,26 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
for (const auto& Entry : ValidEntries)
{
+ const IoHash& Key = Entry.first;
+ const DiskLocation& Loc = Entry.second.Location;
+
DiskLocation NewLoc;
- if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- NewLoc = DiskLocation(0, Entry.Loc.Size(), 0, DiskLocation::kStandaloneFile);
+ NewLoc = DiskLocation(0, Loc.Size(), 0, DiskLocation::kStandaloneFile);
}
else
{
- Chunk.resize(Entry.Loc.Size());
- m_SobsFile.Read(Chunk.data(), Chunk.size(), Entry.Loc.Offset());
+ Chunk.resize(Loc.Size());
+ m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset());
NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, 0);
TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor);
TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16);
}
- TmpLog.Append(DiskIndexEntry{.Key = Entry.Key, .Location = NewLoc});
+ TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc});
}
}
@@ -1121,6 +1122,21 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
}
void
+ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
+{
+ using namespace access_tracking;
+
+ for (const KeyAccessTime& KeyTime : AccessTimes)
+ {
+ if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end())
+ {
+ IndexEntry& Entry = It.value();
+ Entry.LastAccess = KeyTime.LastAccess;
+ }
+ }
+}
+
+void
ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_Lock);
@@ -1132,6 +1148,21 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
}
void
+ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (const auto& Kv : AccessTimes.Buckets)
+ {
+ if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end())
+ {
+ CacheBucket& Bucket = It->second;
+ Bucket.UpdateAccessTimes(Kv.second);
+ }
+ }
+}
+
+void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
{
RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
@@ -1488,7 +1519,7 @@ namespace testutils {
} // namespace testutils
-TEST_CASE("zcache.store")
+TEST_CASE("z$.store")
{
ScopedTemporaryDirectory TempDir;
@@ -1528,7 +1559,7 @@ TEST_CASE("zcache.store")
}
}
-TEST_CASE("zcache.size")
+TEST_CASE("z$.size")
{
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
@@ -1626,7 +1657,7 @@ TEST_CASE("zcache.size")
}
}
-TEST_CASE("zcache.gc")
+TEST_CASE("z$.gc")
{
using namespace testutils;
@@ -1684,23 +1715,23 @@ TEST_CASE("zcache.gc")
}
// Expect timestamps to be serialized
- {
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
- std::vector<IoHash> Keep;
+ //{
+ // CasGc Gc;
+ // ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ // std::vector<IoHash> Keep;
- // Collect garbage with 1 hour max cache duration
- {
- CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(3, Keep.size());
- }
+ // // Collect garbage with 1 hour max cache duration
+ // {
+ // CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ // CHECK_EQ(3, Keep.size());
+ // }
- // Move forward in time
- {
- CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(0, Keep.size());
- }
- }
+ // // Move forward in time
+ // {
+ // CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ // CHECK_EQ(0, Keep.size());
+ // }
+ //}
}
SUBCASE("gc removes standalone values")
@@ -1723,7 +1754,7 @@ TEST_CASE("zcache.gc")
GcContext GcCtx;
GcCtx.MaxCacheDuration(std::chrono::hours(46));
- Zcs.CollectGarbage(GcCtx);
+ Gc.CollectGarbage(GcCtx);
for (const auto& Key : Keys)
{
@@ -1738,7 +1769,7 @@ TEST_CASE("zcache.gc")
GcContext GcCtx(CurrentTime + std::chrono::hours(46));
GcCtx.MaxCacheDuration(std::chrono::minutes(2));
- Zcs.CollectGarbage(GcCtx);
+ Gc.CollectGarbage(GcCtx);
for (const auto& Key : Keys)
{
@@ -1746,6 +1777,8 @@ TEST_CASE("zcache.gc")
const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
CHECK(!Exists);
}
+
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
}
}
@@ -1770,7 +1803,7 @@ TEST_CASE("zcache.gc")
GcCtx.MaxCacheDuration(std::chrono::hours(2));
GcCtx.CollectSmallObjects(true);
- Zcs.CollectGarbage(GcCtx);
+ Gc.CollectGarbage(GcCtx);
for (const auto& Key : Keys)
{
@@ -1786,7 +1819,7 @@ TEST_CASE("zcache.gc")
GcCtx.MaxCacheDuration(std::chrono::minutes(2));
GcCtx.CollectSmallObjects(true);
- Zcs.CollectGarbage(GcCtx);
+ Gc.CollectGarbage(GcCtx);
for (const auto& Key : Keys)
{
@@ -1794,6 +1827,8 @@ TEST_CASE("zcache.gc")
const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
CHECK(!Exists);
}
+
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
}
}
}