diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-22 12:37:28 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:28:33 +0200 |
| commit | e1a9bcaee2e7cf0d9fb9d31c619c26676bd80db8 (patch) | |
| tree | 7a8e98011a9e02b6bb8593aabe65002dedce345a /zenstore/compactcas.cpp | |
| parent | remove test code (diff) | |
| download | zen-e1a9bcaee2e7cf0d9fb9d31c619c26676bd80db8.tar.xz zen-e1a9bcaee2e7cf0d9fb9d31c619c26676bd80db8.zip | |
compactcas.threadedinsert test case
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index e0a0dcd00..4a98fa87c 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -8,6 +8,7 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/workthreadpool.h> #include <gsl/gsl-lite.hpp> #if ZEN_WITH_TESTS @@ -1957,6 +1958,179 @@ TEST_CASE("compactcas.legacyconversion") } } +TEST_CASE("compactcas.threadedinsert") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; + + std::unordered_set<IoHash, IoHash::Hasher> ChunkHashes; + ChunkHashes.reserve(kChunkCount); + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 32768, 16, true); + { + WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little + RwLock ChunkHashesLock; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + ThreadPool.ScheduleWork([&Cas, kChunkSize, &ChunkHashesLock, &ChunkHashes]() { + IoBuffer Chunk = CreateChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + auto InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + RwLock::ExclusiveLockScope _(ChunkHashesLock); + ChunkHashes.insert(Hash); + }); + } + ThreadPool.Flush(); + } + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + + { + WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little + + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { + auto ChunkHash = OldChunkHashes[Idx]; + auto Chunk = Cas.FindChunk(ChunkHash); + auto Hash = IoHash::HashBuffer(Chunk); + CHECK(ChunkHash == Hash); + }); + } + ThreadPool.Flush(); + } + + std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + { + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::vector<IoHash> NewChunkHashes; + NewChunkHashes.reserve(kChunkCount); + + WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little + RwLock ChunkHashesLock; + std::atomic_uint32_t AddedChunkCount; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + ThreadPool.ScheduleWork([&Cas, kChunkSize, &ChunkHashesLock, &NewChunkHashes, &AddedChunkCount]() { + IoBuffer Chunk = CreateChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + auto InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + NewChunkHashes.emplace_back(Hash); + } + AddedChunkCount.fetch_add(1); + }); + ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + auto Chunk = Cas.FindChunk(OldChunkHashes[Idx]); + if (Chunk) + { + CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + } + }); + } + + while (AddedChunkCount.load() < kChunkCount) + { + std::vector<IoHash> AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (auto ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + int32_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 3 == 0 && C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + ThreadPool.Flush(); + + { + std::vector<IoHash> AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (auto ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + int32_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 3 == 0 && C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + } + { + WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little + + for (IoHash ChunkHash : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Cas, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + }); + } + ThreadPool.Flush(); + } +} + #endif void |