diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-30 10:11:25 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-30 10:11:25 +0200 |
| commit | c7a0ddf9f26fdd647574e4031a66362234298e7a (patch) | |
| tree | 5e3eba1856dc160ca0beb2bc5edaf1ea490a0a6f /src | |
| parent | miscellaneous minor bugfixes (#66) (diff) | |
| download | zen-c7a0ddf9f26fdd647574e4031a66362234298e7a.tar.xz zen-c7a0ddf9f26fdd647574e4031a66362234298e7a.zip | |
fix get project files loop (#68)
- Bugfix: Remove extra loop causing GetProjectFiles for project store to find all chunks once for each chunk found
- Bugfix: Don't capture ChunkIndex variable in CasImpl::IterateChunks by reference as it causes crash
- Improvement: Make FileCasStrategy::IterateChunks (optionally) multithreaded (improves GetProjectFiles performance)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 43 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 13 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 49 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 12 | ||||
| -rw-r--r-- | src/zenutil/packageformat.cpp | 47 |
5 files changed, 107 insertions, 57 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 1e53dfd94..afb2c100c 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -938,7 +938,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, { break; } - OptionalWorkerPool->ScheduleWork([&]() { + OptionalWorkerPool->ScheduleWork([&WorkLatch, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (Result.load() == false) { @@ -2730,30 +2730,27 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, RawSizes.resize(Ids.size(), 0u); } - for (size_t Index = 0; Index < Ids.size(); Index++) - { - FoundLog->IterateChunks( - Ids, - [&](size_t Index, const IoBuffer& Payload) { - uint64_t Size = Payload.GetSize(); - if (WantsRawSizeField) - { - uint64_t RawSize = Size; - if (Payload.GetContentType() == ZenContentType::kCompressedBinary) - { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); - } - RawSizes[Index] = RawSize; - } - if (WantsSizeField) + FoundLog->IterateChunks( + Ids, + [&](size_t Index, const IoBuffer& Payload) { + uint64_t Size = Payload.GetSize(); + if (WantsRawSizeField) + { + uint64_t RawSize = Size; + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) { - Sizes[Index] = Size; + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); } - return true; - }, - &GetSmallWorkerPool()); - } + RawSizes[Index] = RawSize; + } + if (WantsSizeField) + { + Sizes[Index] = Size; + } + return true; + }, + &GetSmallWorkerPool()); } CbObjectWriter Response; diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 45d7dd277..67790e2c6 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -430,11 +430,14 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, { return false; } - if (!m_LargeStrategy.IterateChunks(DecompressedIds, [&](size_t Index, const IoBuffer& Payload) { - IoBuffer Chunk(Payload); - Chunk.SetContentType(ZenContentType::kCompressedBinary); - return AsyncCallback(Index, Payload); - })) + if (!m_LargeStrategy.IterateChunks( + DecompressedIds, + [&](size_t Index, const IoBuffer& Payload) { + IoBuffer Chunk(Payload); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return AsyncCallback(Index, Payload); + }, + OptionalWorkerPool)) { return false; } diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index df039d4b6..1c6aa539a 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -16,6 +16,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zencore/uid.h> +#include <zencore/workthreadpool.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> #include <zenutil/basicfile.h> @@ -810,7 +811,9 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) } bool -FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback) +FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) { std::vector<size_t> FoundChunkIndexes; { @@ -823,19 +826,51 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::functio } } } - bool Continue = true; - for (size_t ChunkIndex : FoundChunkIndexes) + std::atomic_bool Continue = true; + if (!FoundChunkIndexes.empty()) { - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); - if (Payload) + auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) { + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + if (Payload) + { + if (!AsyncCallback(ChunkIndex, std::move(Payload))) + { + return false; + } + } + return true; + }; + + Latch WorkLatch(1); + for (size_t ChunkIndex : FoundChunkIndexes) { - Continue = Callback(ChunkIndex, std::move(Payload)); if (!Continue) { break; } + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (!Continue) + { + return; + } + if (!ProcessOne(ChunkIndex)) + { + Continue = false; + } + }); + } + else + { + Continue = Continue && ProcessOne(ChunkIndex); + } } + WorkLatch.CountDown(); + WorkLatch.Wait(); } return Continue; } diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index 06e35de23..07fc36954 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -39,11 +39,13 @@ struct FileCasStrategy final : public GcStorage, public GcReferenceStore IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(HashKeySet& InOutChunks); - bool IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback); - void Flush(); - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override; + bool IterateChunks(std::span<IoHash> ChunkHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool); + void Flush(); + virtual void ScrubStorage(ScrubContext& ScrubCtx) override; + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override; virtual std::string GetGcName(GcCtx& Ctx) override; virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override; diff --git a/src/zenutil/packageformat.cpp b/src/zenutil/packageformat.cpp index 3fa602a96..2512351f5 100644 --- a/src/zenutil/packageformat.cpp +++ b/src/zenutil/packageformat.cpp @@ -461,19 +461,21 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint else { MalformedAttachments.push_back(std::make_pair(i, - fmt::format("Invalid format in '{}' (offset {}, size {})", + fmt::format("Invalid format in '{}' (offset {}, size {}) for {}", Path, AttachRefHdr->PayloadByteOffset, - AttachRefHdr->PayloadByteSize))); + AttachRefHdr->PayloadByteSize, + Entry.AttachmentHash))); } } else { MalformedAttachments.push_back(std::make_pair(i, - fmt::format("Unable to resolve chunk at '{}' (offset {}, size {})", + fmt::format("Unable to resolve chunk at '{}' (offset {}, size {}) for {}", Path, AttachRefHdr->PayloadByteOffset, - AttachRefHdr->PayloadByteSize))); + AttachRefHdr->PayloadByteSize, + Entry.AttachmentHash))); } } else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) @@ -490,17 +492,20 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint else { // First payload is always a compact binary object - MalformedAttachments.push_back(std::make_pair( - i, - fmt::format("Invalid format, expected compressed buffer for CbObject (size {})", AttachmentBuffer.GetSize()))); + MalformedAttachments.push_back( + std::make_pair(i, + fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}", + AttachmentBuffer.GetSize(), + Entry.AttachmentHash))); } } else { - MalformedAttachments.push_back( - std::make_pair(i, - fmt::format("Invalid format, compressed object attachments are not currently supported (size {})", - AttachmentBuffer.GetSize()))); + MalformedAttachments.push_back(std::make_pair( + i, + fmt::format("Invalid format, compressed object attachments are not currently supported (size {}) for {}", + AttachmentBuffer.GetSize(), + Entry.AttachmentHash))); } } else @@ -512,9 +517,11 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint } else { - MalformedAttachments.push_back(std::make_pair( - i, - fmt::format("Invalid format, expected compressed buffer for attachment (size {})", AttachmentBuffer.GetSize()))); + MalformedAttachments.push_back( + std::make_pair(i, + fmt::format("Invalid format, expected compressed buffer for attachment (size {}) for {}", + AttachmentBuffer.GetSize(), + Entry.AttachmentHash))); } } } @@ -530,11 +537,12 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { MalformedAttachments.push_back( std::make_pair(i, - fmt::format("Invalid format, object attachments are not currently supported (size {})", - AttachmentBuffer.GetSize()))); + fmt::format("Invalid format, object attachments are not currently supported (size {}) for {}", + AttachmentBuffer.GetSize(), + Entry.AttachmentHash))); } } - else + else if (AttachmentSize > 0) { // Make a copy of the buffer so the attachments don't reference the entire payload IoBuffer AttachmentBufferCopy = CreateBuffer(Entry.AttachmentHash, AttachmentSize); @@ -544,6 +552,11 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint Attachments.emplace_back(SharedBuffer{AttachmentBufferCopy}); } + else + { + MalformedAttachments.push_back( + std::make_pair(i, fmt::format("Invalid format, attachment of size zero detected for {}", Entry.AttachmentHash))); + } } } PartialFileBuffers.clear(); |