From 4e6abaacede5bb6dea1ff788b1259efeb1212bcc Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 12 Apr 2022 13:58:38 +0200 Subject: Add z$.threadedinsert test --- zenserver/cache/structuredcachestore.cpp | 205 ++++++++++++++++++------------- 1 file changed, 118 insertions(+), 87 deletions(-) (limited to 'zenserver/cache/structuredcachestore.cpp') diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index d28964502..0ce473e89 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -604,7 +604,7 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue m_CacheMap.insert_or_assign(HashKey, BucketValue(Value.Value, GcClock::TickCount())); } - m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed); + m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order_seq_cst); } ////////////////////////////////////////////////////////////////////////// @@ -1303,7 +1303,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is for (const auto& Entry : m_Index) { const DiskLocation& Location = Entry.second.Location; - m_TotalSize.fetch_add(Location.Size(), std::memory_order_release); + m_TotalSize.fetch_add(Location.Size(), std::memory_order_seq_cst); if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; @@ -1524,7 +1524,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& WriteBlock->Write(Value.Value.Data(), ChunkSize, InsertOffset); m_SlogFile.Append(DiskIndexEntry); - m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed); + m_TotalSize.fetch_add(ChunkSize, std::memory_order_seq_cst); { RwLock::ExclusiveLockScope __(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) @@ -1908,10 +1908,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); m_Index.insert({Key, {Loc, GcClock::TickCount()}}); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order_seq_cst); continue; } - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order_seq_cst); DeletedSize += Entry.Location.Size(); DeletedCount++; } @@ -2043,7 +2043,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { auto KeyIt = m_Index.find(Entry.Key); uint64_t ChunkSize = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment).Size; - m_TotalSize.fetch_sub(ChunkSize); + m_TotalSize.fetch_sub(ChunkSize, std::memory_order_seq_cst); m_Index.erase(KeyIt); continue; } @@ -2364,7 +2364,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order_seq_cst); } ////////////////////////////////////////////////////////////////////////// @@ -3105,47 +3105,51 @@ TEST_CASE("z$.legacyconversion") } } -# if 0 TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - - CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 8192; - std::vector ChunkHashes; - ChunkHashes.reserve(kChunkCount); - std::vector Chunks; + struct Chunk + { + std::string Bucket; + IoBuffer Buffer; + }; + std::unordered_map Chunks; Chunks.reserve(kChunkCount); + const std::string Bucket1 = "rightinone"; + const std::string Bucket2 = "rightintwo"; + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - ChunkHashes.emplace_back(Hash); - Chunks.emplace_back(Chunk); + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + } + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + } } - WorkerThreadPool ThreadPool(4); - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 32768, 16, true); + CreateDirectories(TempDir.Path()); + + WorkerThreadPool ThreadPool(4); + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + const GcClock::TimePoint CurrentTime = GcClock::Now(); + { - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : Chunks) { - const IoBuffer& Chunk = Chunks[Idx]; - const IoHash& Hash = ChunkHashes[Idx]; - ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); - }); + ThreadPool.ScheduleWork([&Zcs, &Chunk]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); }); } while (ThreadPool.PendingWork() > 0) { @@ -3153,17 +3157,19 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + const uint64_t TotalSize = Zcs.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * Chunks.size(), TotalSize); { - std::vector OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { - IoHash ChunkHash = OldChunkHashes[Idx]; - IoBuffer Chunk = Cas.FindChunk(ChunkHash); - IoHash Hash = IoHash::HashBuffer(Chunk); + ThreadPool.ScheduleWork([&Zcs, &Chunk]() { + std::string Bucket = Chunk.second.Bucket; + IoHash ChunkHash = Chunk.first; + ZenCacheValue CacheValue; + + CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); + IoHash Hash = IoHash::HashBuffer(CacheValue.Value); CHECK(ChunkHash == Hash); }); } @@ -3172,62 +3178,73 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) Sleep(1); } } - - std::unordered_set GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::unordered_map GcChunkHashes; + GcChunkHashes.reserve(Chunks.size()); + for (const auto& Chunk : Chunks) { - std::vector OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - std::vector NewChunkHashes; - NewChunkHashes.reserve(kChunkCount); - std::vector NewChunks; - NewChunks.reserve(kChunkCount); + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + { + std::unordered_map NewChunks; for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - IoBuffer Chunk = CreateChunk(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunkHashes.emplace_back(Hash); - NewChunks.emplace_back(Chunk); + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + } + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + } } RwLock ChunkHashesLock; std::atomic_uint32_t AddedChunkCount; - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : NewChunks) { - const IoBuffer& Chunk = NewChunks[Idx]; - const IoHash& Hash = NewChunkHashes[Idx]; - ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); + ThreadPool.ScheduleWork([&Zcs, Chunk, &AddedChunkCount]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); AddedChunkCount.fetch_add(1); }); - ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { - IoHash ChunkHash = OldChunkHashes[Idx]; - IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]); - if (Chunk) + } + + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, Chunk]() { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { - CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); } }); } - - while (AddedChunkCount.load() < kChunkCount) + while (AddedChunkCount.load() < NewChunks.size()) { - std::vector AddedHashes; + std::unordered_map AddedChunks; { RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedHashes.swap(NewChunkHashes); + AddedChunks.swap(NewChunks); } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const IoHash& ChunkHash : AddedHashes) + for (const auto& Chunk : AddedChunks) { - if (Cas.HaveChunk(ChunkHash)) + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { - GcChunkHashes.emplace(ChunkHash); + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } } - std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; + std::vector KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) @@ -3249,7 +3266,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepHashes); - Cas.CollectGarbage(GcCtx); + Zcs.CollectGarbage(GcCtx); CasChunkSet& Deleted = GcCtx.DeletedCas(); Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } @@ -3260,27 +3277,41 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } { - std::vector AddedHashes; + std::unordered_map AddedChunks; { RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedHashes.swap(NewChunkHashes); + AddedChunks.swap(NewChunks); } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const IoHash& ChunkHash : AddedHashes) + for (const auto& Chunk : AddedChunks) { - if (Cas.HaveChunk(ChunkHash)) + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { - GcChunkHashes.emplace(ChunkHash); + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } } - std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; + std::vector KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; while (C < KeepHashes.size()) { - if (C % 77 == 0 && C < KeepHashes.size() - 1) + if (C % 155 == 0) { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } } C++; } @@ -3288,17 +3319,18 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepHashes); - Cas.CollectGarbage(GcCtx); + Zcs.CollectGarbage(GcCtx); CasChunkSet& Deleted = GcCtx.DeletedCas(); Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } } { - for (const IoHash& ChunkHash : GcChunkHashes) + for (const auto& Chunk : GcChunkHashes) { - ThreadPool.ScheduleWork([&Cas, ChunkHash]() { - CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + ThreadPool.ScheduleWork([&Zcs, Chunk]() { + ZenCacheValue CacheValue; + CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); }); } while (ThreadPool.PendingWork() > 0) @@ -3308,7 +3340,6 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } } -# endif #endif -- cgit v1.2.3