diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-25 14:49:04 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-11-25 14:49:04 +0100 |
| commit | bcb81b326a373aa86d7e6a046febc8ba74f21c04 (patch) | |
| tree | b20c6d59cefd299b4daac0754c8fab7ec7019b9c /src | |
| parent | stronger validation of payload existance (#229) (diff) | |
| download | zen-bcb81b326a373aa86d7e6a046febc8ba74f21c04.tar.xz zen-bcb81b326a373aa86d7e6a046febc8ba74f21c04.zip | |
caller controls threshold for bulk-loading chunks in IterateChunks (#222)
* Allow caller to control threshold for bulk-loading chunks in IterateChunks
* use smaller batch chunk reading for /fileinfos and /chunkinfos as we do not intend to read the payload
* use smaller batch read buffer when just querying for size of attachments
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 17 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 6 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 3 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 5 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 12 | ||||
| -rw-r--r-- | src/zenstore/cas.h | 3 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 10 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 8 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 3 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 3 |
12 files changed, 50 insertions, 28 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 551b5a76d..f49f6a645 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -672,7 +672,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque AttachmentsSize += Payload.GetSize(); return true; }, - &WorkerPool); + &WorkerPool, + 8u * 1024u); ResponseWriter << "Count" << AllAttachments.size(); ResponseWriter << "Size" << AttachmentsSize; @@ -765,7 +766,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, AttachmentsSize += Payload.GetSize(); return true; }, - &WorkerPool); + &WorkerPool, + 8u * 1024u); ResponseWriter << "AttachmentsSize" << AttachmentsSize; } diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index a43c9e0e2..3bcc3c51d 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1922,15 +1922,17 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) bool ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool); + return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } bool ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { std::vector<size_t> CidChunkIndexes; std::vector<IoHash> CidChunkHashes; @@ -1961,7 +1963,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, m_CidStore.IterateChunks( CidChunkHashes, [&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); }, - OptionalWorkerPool); + OptionalWorkerPool, + LargeSizeLimit); if (OptionalWorkerPool) { @@ -3995,7 +3998,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, } return true; }, - &GetSmallWorkerPool(EWorkloadType::Burst)); + &GetSmallWorkerPool(EWorkloadType::Burst), + 8u * 1024u); } CbObjectWriter Response; @@ -4149,7 +4153,8 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, } return true; }, - &WorkerPool); + &WorkerPool, + 8u * 1024u); } CbObjectWriter Response; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index d8b13585b..860f2c17d 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -118,10 +118,12 @@ public: IoBuffer GetChunkByRawHash(const IoHash& RawHash); bool IterateChunks(std::span<IoHash> RawHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); bool IterateChunks(std::span<Oid> ChunkIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); inline static const uint32_t kInvalidOp = ~0u; /** Persist a new oplog entry diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 8a6d9c18d..fcf934344 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1634,7 +1634,8 @@ TEST_CASE("blockstore.iterate.chunks") break; } return true; - }); + }, + 0); CHECK(Continue); }); return true; diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 93b639a51..3046ab87c 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -2213,7 +2213,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) }; m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { - return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk); + return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); }); } catch (ScrubDeadlineExpiredException&) @@ -3258,7 +3258,8 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger, ZEN_UNUSED(ChunkIndex); CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView()); return !IsCancelledFlag.load(); - }); + }, + 0); if (Continue) { diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 60dc9a505..4c1836cf7 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -59,7 +59,8 @@ public: virtual void FilterChunks(HashKeySet& InOutChunks) override; virtual bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) override; + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) override; virtual void Flush() override; virtual void ScrubStorage(ScrubContext& Ctx) override; virtual CidStoreSize TotalSize() const override; @@ -392,7 +393,8 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks) bool CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { ZEN_TRACE_CPU("CAS::IterateChunks"); if (!m_SmallStrategy.IterateChunks( @@ -402,7 +404,8 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, Chunk.SetContentType(ZenContentType::kCompressedBinary); return AsyncCallback(Index, Payload); }, - OptionalWorkerPool)) + OptionalWorkerPool, + LargeSizeLimit)) { return false; } @@ -413,7 +416,8 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, Chunk.SetContentType(ZenContentType::kCompressedBinary); return AsyncCallback(Index, Payload); }, - OptionalWorkerPool)) + OptionalWorkerPool, + LargeSizeLimit)) { return false; } diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index bedbc6a9a..e279dd2cc 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -47,7 +47,8 @@ public: virtual void FilterChunks(HashKeySet& InOutChunks) = 0; virtual bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) = 0; + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) = 0; virtual void Flush() = 0; virtual void ScrubStorage(ScrubContext& Ctx) = 0; virtual CidStoreSize TotalSize() const = 0; diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 71fd596f4..2ab769d04 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -119,9 +119,10 @@ struct CidStore::Impl bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool); + return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } void Flush() { m_CasStore.Flush(); } @@ -217,9 +218,10 @@ CidStore::ContainsChunk(const IoHash& DecompressedId) bool CidStore::IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { - return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool); + return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } void diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index bc30301d1..792854af6 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -305,7 +305,8 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) bool CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool) + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { if (ChunkHashes.size() < 3) { @@ -344,7 +345,8 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - }); + }, + LargeSizeLimit); }; Latch WorkLatch(1); @@ -498,7 +500,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) }; m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { - return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk); + return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); }); } catch (const ScrubDeadlineExpiredException&) diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 44567e7a0..07e620086 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -58,7 +58,8 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore void FilterChunks(HashKeySet& InOutChunks); bool IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); void Initialize(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint32_t MaxBlockSize, diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index d4c2be73f..8f8f2ccd7 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -167,7 +167,7 @@ public: std::span<const size_t> ChunkIndexes, const IterateChunksSmallSizeCallback& SmallSizeCallback, const IterateChunksLargeSizeCallback& LargeSizeCallback, - uint64_t LargeSizeLimit = 0); + uint64_t LargeSizeLimit); void CompactBlocks( const BlockStoreCompactState& CompactState, diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index d95fa7cd4..b3d00fec0 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -82,7 +82,8 @@ public: virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool); + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); void Flush(); |