aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-25 14:49:04 +0100
committerGitHub Enterprise <[email protected]>2024-11-25 14:49:04 +0100
commitbcb81b326a373aa86d7e6a046febc8ba74f21c04 (patch)
treeb20c6d59cefd299b4daac0754c8fab7ec7019b9c /src
parentstronger validation of payload existance (#229) (diff)
downloadzen-bcb81b326a373aa86d7e6a046febc8ba74f21c04.tar.xz
zen-bcb81b326a373aa86d7e6a046febc8ba74f21c04.zip
caller controls threshold for bulk-loading chunks in IterateChunks (#222)
* Allow caller to control threshold for bulk-loading chunks in IterateChunks * use smaller batch chunk reading for /fileinfos and /chunkinfos as we do not intend to read the payload * use smaller batch read buffer when just querying for size of attachments
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp6
-rw-r--r--src/zenserver/projectstore/projectstore.cpp17
-rw-r--r--src/zenserver/projectstore/projectstore.h6
-rw-r--r--src/zenstore/blockstore.cpp3
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp5
-rw-r--r--src/zenstore/cas.cpp12
-rw-r--r--src/zenstore/cas.h3
-rw-r--r--src/zenstore/cidstore.cpp10
-rw-r--r--src/zenstore/compactcas.cpp8
-rw-r--r--src/zenstore/compactcas.h3
-rw-r--r--src/zenstore/include/zenstore/blockstore.h2
-rw-r--r--src/zenstore/include/zenstore/cidstore.h3
12 files changed, 50 insertions, 28 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 551b5a76d..f49f6a645 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -672,7 +672,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
AttachmentsSize += Payload.GetSize();
return true;
},
- &WorkerPool);
+ &WorkerPool,
+ 8u * 1024u);
ResponseWriter << "Count" << AllAttachments.size();
ResponseWriter << "Size" << AttachmentsSize;
@@ -765,7 +766,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
AttachmentsSize += Payload.GetSize();
return true;
},
- &WorkerPool);
+ &WorkerPool,
+ 8u * 1024u);
ResponseWriter << "AttachmentsSize" << AttachmentsSize;
}
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index a43c9e0e2..3bcc3c51d 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1922,15 +1922,17 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash)
bool
ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
- return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool);
+ return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool, LargeSizeLimit);
}
bool
ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
std::vector<size_t> CidChunkIndexes;
std::vector<IoHash> CidChunkHashes;
@@ -1961,7 +1963,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
m_CidStore.IterateChunks(
CidChunkHashes,
[&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); },
- OptionalWorkerPool);
+ OptionalWorkerPool,
+ LargeSizeLimit);
if (OptionalWorkerPool)
{
@@ -3995,7 +3998,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
}
return true;
},
- &GetSmallWorkerPool(EWorkloadType::Burst));
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 8u * 1024u);
}
CbObjectWriter Response;
@@ -4149,7 +4153,8 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
}
return true;
},
- &WorkerPool);
+ &WorkerPool,
+ 8u * 1024u);
}
CbObjectWriter Response;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index d8b13585b..860f2c17d 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -118,10 +118,12 @@ public:
IoBuffer GetChunkByRawHash(const IoHash& RawHash);
bool IterateChunks(std::span<IoHash> RawHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
bool IterateChunks(std::span<Oid> ChunkIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
inline static const uint32_t kInvalidOp = ~0u;
/** Persist a new oplog entry
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 8a6d9c18d..fcf934344 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1634,7 +1634,8 @@ TEST_CASE("blockstore.iterate.chunks")
break;
}
return true;
- });
+ },
+ 0);
CHECK(Continue);
});
return true;
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 93b639a51..3046ab87c 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -2213,7 +2213,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
};
m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
- return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk);
+ return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
});
}
catch (ScrubDeadlineExpiredException&)
@@ -3258,7 +3258,8 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger,
ZEN_UNUSED(ChunkIndex);
CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView());
return !IsCancelledFlag.load();
- });
+ },
+ 0);
if (Continue)
{
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 60dc9a505..4c1836cf7 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -59,7 +59,8 @@ public:
virtual void FilterChunks(HashKeySet& InOutChunks) override;
virtual bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool) override;
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit) override;
virtual void Flush() override;
virtual void ScrubStorage(ScrubContext& Ctx) override;
virtual CidStoreSize TotalSize() const override;
@@ -392,7 +393,8 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks)
bool
CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
ZEN_TRACE_CPU("CAS::IterateChunks");
if (!m_SmallStrategy.IterateChunks(
@@ -402,7 +404,8 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
Chunk.SetContentType(ZenContentType::kCompressedBinary);
return AsyncCallback(Index, Payload);
},
- OptionalWorkerPool))
+ OptionalWorkerPool,
+ LargeSizeLimit))
{
return false;
}
@@ -413,7 +416,8 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
Chunk.SetContentType(ZenContentType::kCompressedBinary);
return AsyncCallback(Index, Payload);
},
- OptionalWorkerPool))
+ OptionalWorkerPool,
+ LargeSizeLimit))
{
return false;
}
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
index bedbc6a9a..e279dd2cc 100644
--- a/src/zenstore/cas.h
+++ b/src/zenstore/cas.h
@@ -47,7 +47,8 @@ public:
virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
virtual bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool) = 0;
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit) = 0;
virtual void Flush() = 0;
virtual void ScrubStorage(ScrubContext& Ctx) = 0;
virtual CidStoreSize TotalSize() const = 0;
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 71fd596f4..2ab769d04 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -119,9 +119,10 @@ struct CidStore::Impl
bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
- return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool);
+ return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit);
}
void Flush() { m_CasStore.Flush(); }
@@ -217,9 +218,10 @@ CidStore::ContainsChunk(const IoHash& DecompressedId)
bool
CidStore::IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
- return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool);
+ return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit);
}
void
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index bc30301d1..792854af6 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -305,7 +305,8 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
bool
CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool)
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
if (ChunkHashes.size() < 3)
{
@@ -344,7 +345,8 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
},
[&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
- });
+ },
+ LargeSizeLimit);
};
Latch WorkLatch(1);
@@ -498,7 +500,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
};
m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
- return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk);
+ return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
});
}
catch (const ScrubDeadlineExpiredException&)
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index 44567e7a0..07e620086 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -58,7 +58,8 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
void FilterChunks(HashKeySet& InOutChunks);
bool IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
void Initialize(const std::filesystem::path& RootDirectory,
const std::string_view ContainerBaseName,
uint32_t MaxBlockSize,
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index d4c2be73f..8f8f2ccd7 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -167,7 +167,7 @@ public:
std::span<const size_t> ChunkIndexes,
const IterateChunksSmallSizeCallback& SmallSizeCallback,
const IterateChunksLargeSizeCallback& LargeSizeCallback,
- uint64_t LargeSizeLimit = 0);
+ uint64_t LargeSizeLimit);
void CompactBlocks(
const BlockStoreCompactState& CompactState,
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index d95fa7cd4..b3d00fec0 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -82,7 +82,8 @@ public:
virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool);
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
bool ContainsChunk(const IoHash& DecompressedId);
void FilterChunks(HashKeySet& InOutChunks);
void Flush();