aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-12 15:42:40 +0200
committerGitHub <[email protected]>2022-04-12 15:42:40 +0200
commitda93b5928956593e85f07ab8df28fd8940153c06 (patch)
tree5458f8b62f6d5f424fd6c5c9344696d65e588372 /zenstore/compactcas.cpp
parentMac fix (diff)
parentremove unused variable (diff)
downloadzen-da93b5928956593e85f07ab8df28fd8940153c06.tar.xz
zen-da93b5928956593e85f07ab8df28fd8940153c06.zip
Merge pull request #71 from EpicGames/de/fix-compact-cas-threaded-test
Fix failing test due to not generating unique test data
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp140
1 files changed, 70 insertions, 70 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 4297c41e7..dcac5f071 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -2275,17 +2275,22 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
const uint64_t kChunkSize = 1048;
const int32_t kChunkCount = 8192;
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(kChunkCount);
- std::vector<IoBuffer> Chunks;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
Chunks.reserve(kChunkCount);
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
- IoBuffer Chunk = CreateChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- ChunkHashes.emplace_back(Hash);
- Chunks.emplace_back(Chunk);
+ while (true)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = Chunk;
+ break;
+ }
}
WorkerThreadPool ThreadPool(4);
@@ -2293,12 +2298,12 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
CasContainerStrategy Cas(CasConfig, Gc);
Cas.Initialize("test", 32768, 16, true);
{
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : Chunks)
{
- const IoBuffer& Chunk = Chunks[Idx];
- const IoHash& Hash = ChunkHashes[Idx];
- ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ const IoHash& Hash = Chunk.first;
+ const IoBuffer& Buffer = Chunk.second;
+ ThreadPool.ScheduleWork([&Cas, Buffer, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
ZEN_ASSERT(InsertResult.New);
});
}
@@ -2312,13 +2317,12 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
{
- std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() {
- IoHash ChunkHash = OldChunkHashes[Idx];
- IoBuffer Chunk = Cas.FindChunk(ChunkHash);
- IoHash Hash = IoHash::HashBuffer(Chunk);
+ ThreadPool.ScheduleWork([&Cas, &Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Buffer);
CHECK(ChunkHash == Hash);
});
}
@@ -2328,57 +2332,52 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
}
}
- std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
{
- std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
- std::vector<IoHash> NewChunkHashes;
- NewChunkHashes.reserve(kChunkCount);
- std::vector<IoBuffer> NewChunks;
+ GcChunkHashes.insert(Chunk.first);
+ }
+ {
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
NewChunks.reserve(kChunkCount);
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
- IoBuffer Chunk = CreateChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunkHashes.emplace_back(Hash);
- NewChunks.emplace_back(Chunk);
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = Chunk;
}
- RwLock ChunkHashesLock;
std::atomic_uint32_t AddedChunkCount;
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : NewChunks)
{
- const IoBuffer& Chunk = NewChunks[Idx];
- const IoHash& Hash = NewChunkHashes[Idx];
- ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
+ ThreadPool.ScheduleWork([&Cas, Chunk, &AddedChunkCount]() {
+ Cas.InsertChunk(Chunk.second, Chunk.first);
AddedChunkCount.fetch_add(1);
});
- ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() {
- IoHash ChunkHash = OldChunkHashes[Idx];
- IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]);
- if (Chunk)
+ }
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Cas, Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ if (Buffer)
{
- CHECK(ChunkHash == IoHash::HashBuffer(Chunk));
+ CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
}
});
}
- while (AddedChunkCount.load() < kChunkCount)
+ while (AddedChunkCount.load() < NewChunks.size())
{
- std::vector<IoHash> AddedHashes;
- {
- RwLock::ExclusiveLockScope _(ChunkHashesLock);
- AddedHashes.swap(NewChunkHashes);
- }
// Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const IoHash& ChunkHash : AddedHashes)
+ for (const auto& Chunk : NewChunks)
{
- if (Cas.HaveChunk(ChunkHash))
+ if (Cas.HaveChunk(Chunk.first))
{
- GcChunkHashes.emplace(ChunkHash);
+ GcChunkHashes.emplace(Chunk.first);
}
}
std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
@@ -2414,39 +2413,40 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
Sleep(1);
}
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
{
- std::vector<IoHash> AddedHashes;
+ if (Cas.HaveChunk(Chunk.first))
{
- RwLock::ExclusiveLockScope _(ChunkHashesLock);
- AddedHashes.swap(NewChunkHashes);
+ GcChunkHashes.emplace(Chunk.first);
}
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const IoHash& ChunkHash : AddedHashes)
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
{
- if (Cas.HaveChunk(ChunkHash))
+ if (C < KeepHashes.size() - 1)
{
- GcChunkHashes.emplace(ChunkHash);
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
}
- }
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 77 == 0 && C < KeepHashes.size() - 1)
+ if (C + 3 < KeepHashes.size() - 1)
{
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
KeepHashes.pop_back();
}
- C++;
}
-
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
- Cas.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ C++;
}
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
{
for (const IoHash& ChunkHash : GcChunkHashes)