aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-30 10:11:25 +0200
committerGitHub Enterprise <[email protected]>2024-04-30 10:11:25 +0200
commitc7a0ddf9f26fdd647574e4031a66362234298e7a (patch)
tree5e3eba1856dc160ca0beb2bc5edaf1ea490a0a6f /src
parentmiscellaneous minor bugfixes (#66) (diff)
downloadzen-c7a0ddf9f26fdd647574e4031a66362234298e7a.tar.xz
zen-c7a0ddf9f26fdd647574e4031a66362234298e7a.zip
fix get project files loop (#68)
- Bugfix: Remove extra loop causing GetProjectFiles for project store to find all chunks once for each chunk found - Bugfix: Don't capture ChunkIndex variable in CasImpl::IterateChunks by reference as it causes crash - Improvement: Make FileCasStrategy::IterateChunks (optionally) multithreaded (improves GetProjectFiles performance)
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp43
-rw-r--r--src/zenstore/cas.cpp13
-rw-r--r--src/zenstore/filecas.cpp49
-rw-r--r--src/zenstore/filecas.h12
-rw-r--r--src/zenutil/packageformat.cpp47
5 files changed, 107 insertions, 57 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 1e53dfd94..afb2c100c 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -938,7 +938,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
{
break;
}
- OptionalWorkerPool->ScheduleWork([&]() {
+ OptionalWorkerPool->ScheduleWork([&WorkLatch, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
if (Result.load() == false)
{
@@ -2730,30 +2730,27 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
RawSizes.resize(Ids.size(), 0u);
}
- for (size_t Index = 0; Index < Ids.size(); Index++)
- {
- FoundLog->IterateChunks(
- Ids,
- [&](size_t Index, const IoBuffer& Payload) {
- uint64_t Size = Payload.GetSize();
- if (WantsRawSizeField)
- {
- uint64_t RawSize = Size;
- if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
- {
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize);
- }
- RawSizes[Index] = RawSize;
- }
- if (WantsSizeField)
+ FoundLog->IterateChunks(
+ Ids,
+ [&](size_t Index, const IoBuffer& Payload) {
+ uint64_t Size = Payload.GetSize();
+ if (WantsRawSizeField)
+ {
+ uint64_t RawSize = Size;
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
{
- Sizes[Index] = Size;
+ IoHash __;
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize);
}
- return true;
- },
- &GetSmallWorkerPool());
- }
+ RawSizes[Index] = RawSize;
+ }
+ if (WantsSizeField)
+ {
+ Sizes[Index] = Size;
+ }
+ return true;
+ },
+ &GetSmallWorkerPool());
}
CbObjectWriter Response;
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 45d7dd277..67790e2c6 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -430,11 +430,14 @@ CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
{
return false;
}
- if (!m_LargeStrategy.IterateChunks(DecompressedIds, [&](size_t Index, const IoBuffer& Payload) {
- IoBuffer Chunk(Payload);
- Chunk.SetContentType(ZenContentType::kCompressedBinary);
- return AsyncCallback(Index, Payload);
- }))
+ if (!m_LargeStrategy.IterateChunks(
+ DecompressedIds,
+ [&](size_t Index, const IoBuffer& Payload) {
+ IoBuffer Chunk(Payload);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+ return AsyncCallback(Index, Payload);
+ },
+ OptionalWorkerPool))
{
return false;
}
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index df039d4b6..1c6aa539a 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -16,6 +16,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zencore/uid.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/basicfile.h>
@@ -810,7 +811,9 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
}
bool
-FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback)
+FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
{
std::vector<size_t> FoundChunkIndexes;
{
@@ -823,19 +826,51 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::functio
}
}
}
- bool Continue = true;
- for (size_t ChunkIndex : FoundChunkIndexes)
+ std::atomic_bool Continue = true;
+ if (!FoundChunkIndexes.empty())
{
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
- if (Payload)
+ auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) {
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ if (Payload)
+ {
+ if (!AsyncCallback(ChunkIndex, std::move(Payload)))
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ Latch WorkLatch(1);
+ for (size_t ChunkIndex : FoundChunkIndexes)
{
- Continue = Callback(ChunkIndex, std::move(Payload));
if (!Continue)
{
break;
}
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (!Continue)
+ {
+ return;
+ }
+ if (!ProcessOne(ChunkIndex))
+ {
+ Continue = false;
+ }
+ });
+ }
+ else
+ {
+ Continue = Continue && ProcessOne(ChunkIndex);
+ }
}
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
return Continue;
}
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index 06e35de23..07fc36954 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -39,11 +39,13 @@ struct FileCasStrategy final : public GcStorage, public GcReferenceStore
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(HashKeySet& InOutChunks);
- bool IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback);
- void Flush();
- virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
- virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override;
+ bool IterateChunks(std::span<IoHash> ChunkHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool);
+ void Flush();
+ virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
+ virtual void CollectGarbage(GcContext& GcCtx) override;
+ virtual GcStorageSize StorageSize() const override;
virtual std::string GetGcName(GcCtx& Ctx) override;
virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override;
diff --git a/src/zenutil/packageformat.cpp b/src/zenutil/packageformat.cpp
index 3fa602a96..2512351f5 100644
--- a/src/zenutil/packageformat.cpp
+++ b/src/zenutil/packageformat.cpp
@@ -461,19 +461,21 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
else
{
MalformedAttachments.push_back(std::make_pair(i,
- fmt::format("Invalid format in '{}' (offset {}, size {})",
+ fmt::format("Invalid format in '{}' (offset {}, size {}) for {}",
Path,
AttachRefHdr->PayloadByteOffset,
- AttachRefHdr->PayloadByteSize)));
+ AttachRefHdr->PayloadByteSize,
+ Entry.AttachmentHash)));
}
}
else
{
MalformedAttachments.push_back(std::make_pair(i,
- fmt::format("Unable to resolve chunk at '{}' (offset {}, size {})",
+ fmt::format("Unable to resolve chunk at '{}' (offset {}, size {}) for {}",
Path,
AttachRefHdr->PayloadByteOffset,
- AttachRefHdr->PayloadByteSize)));
+ AttachRefHdr->PayloadByteSize,
+ Entry.AttachmentHash)));
}
}
else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
@@ -490,17 +492,20 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
else
{
// First payload is always a compact binary object
- MalformedAttachments.push_back(std::make_pair(
- i,
- fmt::format("Invalid format, expected compressed buffer for CbObject (size {})", AttachmentBuffer.GetSize())));
+ MalformedAttachments.push_back(
+ std::make_pair(i,
+ fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}",
+ AttachmentBuffer.GetSize(),
+ Entry.AttachmentHash)));
}
}
else
{
- MalformedAttachments.push_back(
- std::make_pair(i,
- fmt::format("Invalid format, compressed object attachments are not currently supported (size {})",
- AttachmentBuffer.GetSize())));
+ MalformedAttachments.push_back(std::make_pair(
+ i,
+ fmt::format("Invalid format, compressed object attachments are not currently supported (size {}) for {}",
+ AttachmentBuffer.GetSize(),
+ Entry.AttachmentHash)));
}
}
else
@@ -512,9 +517,11 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
}
else
{
- MalformedAttachments.push_back(std::make_pair(
- i,
- fmt::format("Invalid format, expected compressed buffer for attachment (size {})", AttachmentBuffer.GetSize())));
+ MalformedAttachments.push_back(
+ std::make_pair(i,
+ fmt::format("Invalid format, expected compressed buffer for attachment (size {}) for {}",
+ AttachmentBuffer.GetSize(),
+ Entry.AttachmentHash)));
}
}
}
@@ -530,11 +537,12 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
{
MalformedAttachments.push_back(
std::make_pair(i,
- fmt::format("Invalid format, object attachments are not currently supported (size {})",
- AttachmentBuffer.GetSize())));
+ fmt::format("Invalid format, object attachments are not currently supported (size {}) for {}",
+ AttachmentBuffer.GetSize(),
+ Entry.AttachmentHash)));
}
}
- else
+ else if (AttachmentSize > 0)
{
// Make a copy of the buffer so the attachments don't reference the entire payload
IoBuffer AttachmentBufferCopy = CreateBuffer(Entry.AttachmentHash, AttachmentSize);
@@ -544,6 +552,11 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
Attachments.emplace_back(SharedBuffer{AttachmentBufferCopy});
}
+ else
+ {
+ MalformedAttachments.push_back(
+ std::make_pair(i, fmt::format("Invalid format, attachment of size zero detected for {}", Entry.AttachmentHash)));
+ }
}
}
PartialFileBuffers.clear();