aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-01-26 11:47:41 +0100
committerGitHub Enterprise <[email protected]>2026-01-26 11:47:41 +0100
commit47b6d0219015b2b3d0fb4eda58bd0744d00130e9 (patch)
tree16fedb1b24bd6ad2a7deeb338285f826f5422a41
parentbuilds scanning cache (#727) (diff)
downloadzen-47b6d0219015b2b3d0fb4eda58bd0744d00130e9.tar.xz
zen-47b6d0219015b2b3d0fb4eda58bd0744d00130e9.zip
avoid big ioworker backlog (#733)
* add ability to override scheduling mode in ParallelWork * Don't increase buffering size when copying from local cache with --boost-worker-memory enabled * Rework scheduling writes of downloaded data to reduce memory usage
-rw-r--r--CHANGELOG.md2
-rw-r--r--VERSION.txt2
-rw-r--r--src/zencore/include/zencore/parallelwork.h16
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp75
-rw-r--r--src/zenremotestore/include/zenremotestore/filesystemutils.h4
5 files changed, 59 insertions, 40 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3891459c5..c5d5e8e79 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,6 +21,8 @@
- Feature: Added experimental zenserver "hub" mode which is used to manage a set of zenserver instances on a host
- Feature: Added `--chunking-cache-path` option to `zen builds upload` and `zen builds diff`
- Path to cache for chunking information of scanned files. Default is empty resulting in no caching
+- Improvement: For `zen builds download`, reworked scheduling writes of downloaded data to reduce memory usage
+- Improvement: For `zen builds download`, reworked copy from local data buffering to reduce memory usage
- Bugfix: Fix oplog navigation elements that were showing up as HTML escape codes instead of the intended characters
## 5.7.18
diff --git a/VERSION.txt b/VERSION.txt
index 9e80ae0ca..c2cb98d47 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-5.7.18 \ No newline at end of file
+5.7.19-pre0 \ No newline at end of file
diff --git a/src/zencore/include/zencore/parallelwork.h b/src/zencore/include/zencore/parallelwork.h
index 138d0bc7c..536b0a056 100644
--- a/src/zencore/include/zencore/parallelwork.h
+++ b/src/zencore/include/zencore/parallelwork.h
@@ -21,7 +21,19 @@ public:
typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback;
- void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {})
+ inline void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work) { ScheduleWork(WorkerPool, std::move(Work), {}, m_Mode); }
+
+ inline void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError)
+ {
+ ScheduleWork(WorkerPool, std::move(Work), std::move(OnError), m_Mode);
+ }
+
+ inline void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, WorkerThreadPool::EMode Mode)
+ {
+ ScheduleWork(WorkerPool, std::move(Work), {}, Mode);
+ }
+
+ void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError, WorkerThreadPool::EMode Mode)
{
m_PendingWork.AddCount(1);
try
@@ -42,7 +54,7 @@ public:
OnError(std::current_exception(), m_AbortFlag);
}
},
- m_Mode);
+ Mode);
}
catch (const std::exception& Ex)
{
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 485973e2b..76cbe6540 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -1302,37 +1302,39 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
}
- Work.ScheduleWork(m_IOWorkerPool,
- [this,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &ExistsResult,
- &WritePartsComplete,
- &LooseChunkHashWorks,
- LooseChunkHashWorkIndex,
- TotalRequestCount,
- TotalPartWriteCount,
- &WriteCache,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
- ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
- if (!m_AbortFlag)
- {
- LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
- const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
- WriteLooseChunk(RemoteChunkIndex,
- ExistsResult,
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- std::move(LooseChunkHashWork.ChunkTargetPtrs),
- WriteCache,
- Work,
- TotalRequestCount,
- TotalPartWriteCount,
- FilteredDownloadedBytesPerSecond,
- FilteredWrittenBytesPerSecond);
- }
- });
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &ExistsResult,
+ &WritePartsComplete,
+ &LooseChunkHashWorks,
+ LooseChunkHashWorkIndex,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &WriteCache,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
+ ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
+ if (!m_AbortFlag)
+ {
+ LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
+ const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
+ WriteLooseChunk(RemoteChunkIndex,
+ ExistsResult,
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ std::move(LooseChunkHashWork.ChunkTargetPtrs),
+ WriteCache,
+ Work,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ FilteredDownloadedBytesPerSecond,
+ FilteredWrittenBytesPerSecond);
+ }
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
std::unique_ptr<CloneQueryInterface> CloneQuery;
@@ -1592,7 +1594,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredWrittenBytesPerSecond.Stop();
}
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
});
}
@@ -1780,7 +1783,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredWrittenBytesPerSecond.Stop();
}
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
}
}
@@ -3750,7 +3754,7 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
break;
}
const uint64_t NextChunkLength = m_RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
- if (ReadLength + NextChunkLength > m_Options.MaximumInMemoryPayloadSize)
+ if (ReadLength + NextChunkLength > BufferedOpenFile::BlockSize)
{
break;
}
@@ -4421,7 +4425,8 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
}
}
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
void
diff --git a/src/zenremotestore/include/zenremotestore/filesystemutils.h b/src/zenremotestore/include/zenremotestore/filesystemutils.h
index cfd6f02e1..cb2d718f7 100644
--- a/src/zenremotestore/include/zenremotestore/filesystemutils.h
+++ b/src/zenremotestore/include/zenremotestore/filesystemutils.h
@@ -12,6 +12,8 @@ class CompositeBuffer;
class BufferedOpenFile
{
public:
+ static constexpr uint64_t BlockSize = 256u * 1024u;
+
BufferedOpenFile(const std::filesystem::path Path,
std::atomic<uint64_t>& OpenReadCount,
std::atomic<uint64_t>& CurrentOpenFileCount,
@@ -30,8 +32,6 @@ public:
void* Handle() { return m_Source.Handle(); }
private:
- const uint64_t BlockSize = 256u * 1024u;
-
BasicFile m_Source;
const uint64_t m_SourceSize;
std::atomic<uint64_t>& m_OpenReadCount;