aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-12 22:19:39 +0200
committerGitHub <[email protected]>2022-04-12 22:19:39 +0200
commite20b70dd594108c21e8bff0dd813dd8dacdf007b (patch)
tree3a7826231931df259f5feda355d92dcc850628b1 /zenstore/compactcas.cpp
parentreduce number of chunks in compactcas.threadedinsert (diff)
parentwait until work is completed, not just picked up (diff)
downloadzen-e20b70dd594108c21e8bff0dd813dd8dacdf007b.tar.xz
zen-e20b70dd594108c21e8bff0dd813dd8dacdf007b.zip
Merge pull request #72 from EpicGames/de/set-ulimit
attempt to change the maximum number of files open at startup
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp35
1 files changed, 23 insertions, 12 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index f89108dc4..920ed965f 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -2272,8 +2272,9 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
CreateDirectories(CasConfig.RootDirectory);
- const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 4096;
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 4096;
+ uint64_t ExpectedSize = 0;
std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
Chunks.reserve(kChunkCount);
@@ -2289,10 +2290,12 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
continue;
}
Chunks[Hash] = Chunk;
+ ExpectedSize += Chunk.Size();
break;
}
}
+ std::atomic<size_t> WorkCompleted = 0;
WorkerThreadPool ThreadPool(4);
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
@@ -2302,31 +2305,34 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
{
const IoHash& Hash = Chunk.first;
const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork([&Cas, Buffer, Hash]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < Chunks.size())
{
Sleep(1);
}
}
+ WorkCompleted = 0;
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ CHECK_EQ(ExpectedSize, TotalSize);
{
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &Chunk]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
IoHash ChunkHash = Chunk.first;
IoBuffer Buffer = Cas.FindChunk(ChunkHash);
IoHash Hash = IoHash::HashBuffer(Buffer);
CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < Chunks.size())
{
Sleep(1);
}
@@ -2339,6 +2345,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
GcChunkHashes.insert(Chunk.first);
}
{
+ WorkCompleted = 0;
std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
NewChunks.reserve(kChunkCount);
@@ -2353,20 +2360,22 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Cas, Chunk, &AddedChunkCount]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
Cas.InsertChunk(Chunk.second, Chunk.first);
AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
});
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, Chunk]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
IoHash ChunkHash = Chunk.first;
IoBuffer Buffer = Cas.FindChunk(ChunkHash);
if (Buffer)
{
CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
}
+ WorkCompleted.fetch_add(1);
});
}
@@ -2408,7 +2417,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
{
Sleep(1);
}
@@ -2449,14 +2458,16 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
{
+ WorkCompleted = 0;
for (const IoHash& ChunkHash : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Cas, ChunkHash]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
CHECK(Cas.HaveChunk(ChunkHash));
CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < GcChunkHashes.size())
{
Sleep(1);
}