aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-12 13:58:38 +0200
committerDan Engelbrecht <[email protected]>2022-04-12 22:20:47 +0200
commit4e6abaacede5bb6dea1ff788b1259efeb1212bcc (patch)
treee6594a11b08d06a916812f5ac5fe4d4af11147cb /zenserver/cache/structuredcachestore.cpp
parentcorrect expire vs contribute (diff)
downloadzen-4e6abaacede5bb6dea1ff788b1259efeb1212bcc.tar.xz
zen-4e6abaacede5bb6dea1ff788b1259efeb1212bcc.zip
Add z$.threadedinsert test
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp205
1 files changed, 118 insertions, 87 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index d28964502..0ce473e89 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -604,7 +604,7 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
m_CacheMap.insert_or_assign(HashKey, BucketValue(Value.Value, GcClock::TickCount()));
}
- m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed);
+ m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order_seq_cst);
}
//////////////////////////////////////////////////////////////////////////
@@ -1303,7 +1303,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
for (const auto& Entry : m_Index)
{
const DiskLocation& Location = Entry.second.Location;
- m_TotalSize.fetch_add(Location.Size(), std::memory_order_release);
+ m_TotalSize.fetch_add(Location.Size(), std::memory_order_seq_cst);
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
continue;
@@ -1524,7 +1524,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
WriteBlock->Write(Value.Value.Data(), ChunkSize, InsertOffset);
m_SlogFile.Append(DiskIndexEntry);
- m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed);
+ m_TotalSize.fetch_add(ChunkSize, std::memory_order_seq_cst);
{
RwLock::ExclusiveLockScope __(m_IndexLock);
if (auto It = m_Index.find(HashKey); It != m_Index.end())
@@ -1908,10 +1908,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
}
m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
m_Index.insert({Key, {Loc, GcClock::TickCount()}});
- m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order_seq_cst);
continue;
}
- m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order_seq_cst);
DeletedSize += Entry.Location.Size();
DeletedCount++;
}
@@ -2043,7 +2043,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
auto KeyIt = m_Index.find(Entry.Key);
uint64_t ChunkSize = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment).Size;
- m_TotalSize.fetch_sub(ChunkSize);
+ m_TotalSize.fetch_sub(ChunkSize, std::memory_order_seq_cst);
m_Index.erase(KeyIt);
continue;
}
@@ -2364,7 +2364,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ m_TotalSize.fetch_add(Loc.Size(), std::memory_order_seq_cst);
}
//////////////////////////////////////////////////////////////////////////
@@ -3105,47 +3105,51 @@ TEST_CASE("z$.legacyconversion")
}
}
-# if 0
TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
{
// for (uint32_t i = 0; i < 100; ++i)
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
-
- CreateDirectories(CasConfig.RootDirectory);
-
const uint64_t kChunkSize = 1048;
const int32_t kChunkCount = 8192;
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(kChunkCount);
- std::vector<IoBuffer> Chunks;
+ struct Chunk
+ {
+ std::string Bucket;
+ IoBuffer Buffer;
+ };
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
Chunks.reserve(kChunkCount);
+ const std::string Bucket1 = "rightinone";
+ const std::string Bucket2 = "rightintwo";
+
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
- IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- ChunkHashes.emplace_back(Hash);
- Chunks.emplace_back(Chunk);
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ }
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ }
}
- WorkerThreadPool ThreadPool(4);
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 32768, 16, true);
+ CreateDirectories(TempDir.Path());
+
+ WorkerThreadPool ThreadPool(4);
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path());
+ const GcClock::TimePoint CurrentTime = GcClock::Now();
+
{
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : Chunks)
{
- const IoBuffer& Chunk = Chunks[Idx];
- const IoHash& Hash = ChunkHashes[Idx];
- ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
- });
+ ThreadPool.ScheduleWork([&Zcs, &Chunk]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); });
}
while (ThreadPool.PendingWork() > 0)
{
@@ -3153,17 +3157,19 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
}
}
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * Chunks.size(), TotalSize);
{
- std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() {
- IoHash ChunkHash = OldChunkHashes[Idx];
- IoBuffer Chunk = Cas.FindChunk(ChunkHash);
- IoHash Hash = IoHash::HashBuffer(Chunk);
+ ThreadPool.ScheduleWork([&Zcs, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
+
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
CHECK(ChunkHash == Hash);
});
}
@@ -3172,62 +3178,73 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
Sleep(1);
}
}
-
- std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
{
- std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
- std::vector<IoHash> NewChunkHashes;
- NewChunkHashes.reserve(kChunkCount);
- std::vector<IoBuffer> NewChunks;
- NewChunks.reserve(kChunkCount);
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ {
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
- IoBuffer Chunk = CreateChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunkHashes.emplace_back(Hash);
- NewChunks.emplace_back(Chunk);
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ }
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ }
}
RwLock ChunkHashesLock;
std::atomic_uint32_t AddedChunkCount;
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : NewChunks)
{
- const IoBuffer& Chunk = NewChunks[Idx];
- const IoHash& Hash = NewChunkHashes[Idx];
- ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
+ ThreadPool.ScheduleWork([&Zcs, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
AddedChunkCount.fetch_add(1);
});
- ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() {
- IoHash ChunkHash = OldChunkHashes[Idx];
- IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]);
- if (Chunk)
+ }
+
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
{
- CHECK(ChunkHash == IoHash::HashBuffer(Chunk));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
}
});
}
-
- while (AddedChunkCount.load() < kChunkCount)
+ while (AddedChunkCount.load() < NewChunks.size())
{
- std::vector<IoHash> AddedHashes;
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> AddedChunks;
{
RwLock::ExclusiveLockScope _(ChunkHashesLock);
- AddedHashes.swap(NewChunkHashes);
+ AddedChunks.swap(NewChunks);
}
// Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const IoHash& ChunkHash : AddedHashes)
+ for (const auto& Chunk : AddedChunks)
{
- if (Cas.HaveChunk(ChunkHash))
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
{
- GcChunkHashes.emplace(ChunkHash);
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
}
}
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
while (C < KeepHashes.size())
{
if (C % 155 == 0)
@@ -3249,7 +3266,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
GcCtx.ContributeCas(KeepHashes);
- Cas.CollectGarbage(GcCtx);
+ Zcs.CollectGarbage(GcCtx);
CasChunkSet& Deleted = GcCtx.DeletedCas();
Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
@@ -3260,27 +3277,41 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
}
{
- std::vector<IoHash> AddedHashes;
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> AddedChunks;
{
RwLock::ExclusiveLockScope _(ChunkHashesLock);
- AddedHashes.swap(NewChunkHashes);
+ AddedChunks.swap(NewChunks);
}
// Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const IoHash& ChunkHash : AddedHashes)
+ for (const auto& Chunk : AddedChunks)
{
- if (Cas.HaveChunk(ChunkHash))
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
{
- GcChunkHashes.emplace(ChunkHash);
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
}
}
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
while (C < KeepHashes.size())
{
- if (C % 77 == 0 && C < KeepHashes.size() - 1)
+ if (C % 155 == 0)
{
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
}
C++;
}
@@ -3288,17 +3319,18 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
GcCtx.ContributeCas(KeepHashes);
- Cas.CollectGarbage(GcCtx);
+ Zcs.CollectGarbage(GcCtx);
CasChunkSet& Deleted = GcCtx.DeletedCas();
Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
}
{
- for (const IoHash& ChunkHash : GcChunkHashes)
+ for (const auto& Chunk : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Cas, ChunkHash]() {
- CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ ThreadPool.ScheduleWork([&Zcs, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
});
}
while (ThreadPool.PendingWork() > 0)
@@ -3308,7 +3340,6 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
}
}
}
-# endif
#endif