aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-22 12:37:28 +0100
committerDan Engelbrecht <[email protected]>2022-03-31 11:28:33 +0200
commite1a9bcaee2e7cf0d9fb9d31c619c26676bd80db8 (patch)
tree7a8e98011a9e02b6bb8593aabe65002dedce345a /zenstore/compactcas.cpp
parentremove test code (diff)
downloadzen-e1a9bcaee2e7cf0d9fb9d31c619c26676bd80db8.tar.xz
zen-e1a9bcaee2e7cf0d9fb9d31c619c26676bd80db8.zip
compactcas.threadedinsert test case
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp174
1 files changed, 174 insertions, 0 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index e0a0dcd00..4a98fa87c 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -8,6 +8,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/workthreadpool.h>
#include <gsl/gsl-lite.hpp>
#if ZEN_WITH_TESTS
@@ -1957,6 +1958,179 @@ TEST_CASE("compactcas.legacyconversion")
}
}
+TEST_CASE("compactcas.threadedinsert")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ std::unordered_set<IoHash, IoHash::Hasher> ChunkHashes;
+ ChunkHashes.reserve(kChunkCount);
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 32768, 16, true);
+ {
+ WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little
+ RwLock ChunkHashesLock;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ ThreadPool.ScheduleWork([&Cas, kChunkSize, &ChunkHashesLock, &ChunkHashes]() {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ const IoHash Hash = HashBuffer(Chunk);
+ auto InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ ChunkHashes.insert(Hash);
+ });
+ }
+ ThreadPool.Flush();
+ }
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+
+ {
+ WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little
+
+ std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() {
+ auto ChunkHash = OldChunkHashes[Idx];
+ auto Chunk = Cas.FindChunk(ChunkHash);
+ auto Hash = IoHash::HashBuffer(Chunk);
+ CHECK(ChunkHash == Hash);
+ });
+ }
+ ThreadPool.Flush();
+ }
+
+ std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ {
+ std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ std::vector<IoHash> NewChunkHashes;
+ NewChunkHashes.reserve(kChunkCount);
+
+ WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little
+ RwLock ChunkHashesLock;
+ std::atomic_uint32_t AddedChunkCount;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ ThreadPool.ScheduleWork([&Cas, kChunkSize, &ChunkHashesLock, &NewChunkHashes, &AddedChunkCount]() {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ const IoHash Hash = HashBuffer(Chunk);
+ auto InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ NewChunkHashes.emplace_back(Hash);
+ }
+ AddedChunkCount.fetch_add(1);
+ });
+ ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() {
+ IoHash ChunkHash = OldChunkHashes[Idx];
+ auto Chunk = Cas.FindChunk(OldChunkHashes[Idx]);
+ if (Chunk)
+ {
+ CHECK(ChunkHash == IoHash::HashBuffer(Chunk));
+ }
+ });
+ }
+
+ while (AddedChunkCount.load() < kChunkCount)
+ {
+ std::vector<IoHash> AddedHashes;
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ AddedHashes.swap(NewChunkHashes);
+ }
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (auto ChunkHash : AddedHashes)
+ {
+ if (Cas.HaveChunk(ChunkHash))
+ {
+ GcChunkHashes.emplace(ChunkHash);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ int32_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 3 == 0 && C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ ThreadPool.Flush();
+
+ {
+ std::vector<IoHash> AddedHashes;
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ AddedHashes.swap(NewChunkHashes);
+ }
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (auto ChunkHash : AddedHashes)
+ {
+ if (Cas.HaveChunk(ChunkHash))
+ {
+ GcChunkHashes.emplace(ChunkHash);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ int32_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 3 == 0 && C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little
+
+ for (IoHash ChunkHash : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Cas, ChunkHash]() {
+ CHECK(Cas.HaveChunk(ChunkHash));
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ });
+ }
+ ThreadPool.Flush();
+ }
+}
+
#endif
void