aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-12-10 10:37:00 +0100
committerGitHub Enterprise <[email protected]>2025-12-10 10:37:00 +0100
commitf5505816af7a9d8f7f06f7afaa090bf1d4d2b447 (patch)
treeabc2ea571d58b24acfd35eb5d0ec09104754c463 /src
parentfix reading of stale iterator when doing download-resume (#681) (diff)
downloadzen-f5505816af7a9d8f7f06f7afaa090bf1d4d2b447.tar.xz
zen-f5505816af7a9d8f7f06f7afaa090bf1d4d2b447.zip
fix buffer memory in builds cache (#682)
* add --zen-cache-upload option to zen oplog-import command * fix buildstoragecache to not hold on to possibly materialized buffers
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp6
-rw-r--r--src/zenremotestore/builds/buildstoragecache.cpp86
2 files changed, 74 insertions, 18 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 4fdbd3b0d..b6af2176e 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -1385,6 +1385,12 @@ ImportOplogCommand::ImportOplogCommand()
m_Options
.add_option("builds", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>");
+ m_Options.add_option("builds",
+ "",
+ "zen-cache-upload",
+ "Upload data downloaded from remote host to zen cache",
+ cxxopts::value(m_UploadToZenCache),
+ "<uploadtozencache>");
m_Options.add_option("builds", "", "builds-id", "Builds Id", cxxopts::value(m_BuildsId), "<id>");
m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>");
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp
index 9f17e6b15..07fcd62ba 100644
--- a/src/zenremotestore/builds/buildstoragecache.cpp
+++ b/src/zenremotestore/builds/buildstoragecache.cpp
@@ -13,6 +13,7 @@
#include <zenhttp/packageformat.h>
ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
#include <tsl/robin_set.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -94,26 +95,75 @@ public:
{
ZEN_ASSERT(!IsFlushed);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
- ScheduleBackgroundWork(
- [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
- ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob");
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
-
- HttpClient::Response CacheResponse =
- m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
- Payload,
- ContentType);
-
- m_Stats.PutBlobCount++;
- m_Stats.PutBlobByteCount += Payload.GetSize();
-
- AddStatistic(CacheResponse);
- if (!CacheResponse.IsSuccess())
+
+ // Move all segments in Payload to be file handle based so if Payload is materialized it does not affect buffers in queue
+ std::vector<SharedBuffer> FileBasedSegments;
+ std::span<const SharedBuffer> Segments = Payload.GetSegments();
+ FileBasedSegments.reserve(Segments.size());
+ {
+ tsl::robin_map<void*, std::filesystem::path> HandleToPath;
+ for (const SharedBuffer& Segment : Segments)
+ {
+ std::filesystem::path FilePath;
+ IoBufferFileReference Ref;
+ if (Segment.AsIoBuffer().GetFileReference(Ref))
{
- ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv));
+ if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end())
+ {
+ FilePath = It->second;
+ }
+ else
+ {
+ std::error_code Ec;
+ std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec);
+ if (!Ec && !Path.empty())
+ {
+ HandleToPath.insert_or_assign(Ref.FileHandle, Path);
+ FilePath = std::move(Path);
+ }
+ }
}
- });
+
+ if (!FilePath.empty())
+ {
+ IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize);
+ if (BufferFromFile)
+ {
+ FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile)));
+ }
+ else
+ {
+ FileBasedSegments.push_back(Segment);
+ }
+ }
+ else
+ {
+ FileBasedSegments.push_back(Segment);
+ }
+ }
+ }
+
+ CompositeBuffer FilePayload(std::move(FileBasedSegments));
+
+ ScheduleBackgroundWork([this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = std::move(FilePayload)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob");
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ Payload,
+ ContentType);
+
+ m_Stats.PutBlobCount++;
+ m_Stats.PutBlobByteCount += Payload.GetSize();
+
+ AddStatistic(CacheResponse);
+ if (!CacheResponse.IsSuccess())
+ {
+ ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv));
+ }
+ });
}
virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override