diff options
| author | Dan Engelbrecht <[email protected]> | 2025-12-10 10:37:00 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-12-10 10:37:00 +0100 |
| commit | f5505816af7a9d8f7f06f7afaa090bf1d4d2b447 (patch) | |
| tree | abc2ea571d58b24acfd35eb5d0ec09104754c463 /src | |
| parent | fix reading of stale iterator when doing download-resume (#681) (diff) | |
| download | zen-f5505816af7a9d8f7f06f7afaa090bf1d4d2b447.tar.xz zen-f5505816af7a9d8f7f06f7afaa090bf1d4d2b447.zip | |
fix buffer memory in builds cache (#682)
* add --zen-cache-upload option to zen oplog-import command
* fix buildstoragecache to not hold on to possibly materialized buffers
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 6 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstoragecache.cpp | 86 |
2 files changed, 74 insertions, 18 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 4fdbd3b0d..b6af2176e 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -1385,6 +1385,12 @@ ImportOplogCommand::ImportOplogCommand() m_Options .add_option("builds", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>"); + m_Options.add_option("builds", + "", + "zen-cache-upload", + "Upload data downloaded from remote host to zen cache", + cxxopts::value(m_UploadToZenCache), + "<uploadtozencache>"); m_Options.add_option("builds", "", "builds-id", "Builds Id", cxxopts::value(m_BuildsId), "<id>"); m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>"); diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index 9f17e6b15..07fcd62ba 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -13,6 +13,7 @@ #include <zenhttp/packageformat.h> ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> #include <tsl/robin_set.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -94,26 +95,75 @@ public: { ZEN_ASSERT(!IsFlushed); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - ScheduleBackgroundWork( - [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - HttpClient::Response CacheResponse = - m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), - Payload, - ContentType); - - m_Stats.PutBlobCount++; - m_Stats.PutBlobByteCount += Payload.GetSize(); - - AddStatistic(CacheResponse); - if (!CacheResponse.IsSuccess()) + + // Move all segments in Payload to be file handle based so if Payload is materialized it does not affect buffers in queue + std::vector<SharedBuffer> FileBasedSegments; + std::span<const SharedBuffer> Segments = Payload.GetSegments(); + FileBasedSegments.reserve(Segments.size()); + { + tsl::robin_map<void*, std::filesystem::path> HandleToPath; + for (const SharedBuffer& Segment : Segments) + { + std::filesystem::path FilePath; + IoBufferFileReference Ref; + if (Segment.AsIoBuffer().GetFileReference(Ref)) { - ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end()) + { + FilePath = It->second; + } + else + { + std::error_code Ec; + std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec); + if (!Ec && !Path.empty()) + { + HandleToPath.insert_or_assign(Ref.FileHandle, Path); + FilePath = std::move(Path); + } + } } - }); + + if (!FilePath.empty()) + { + IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize); + if (BufferFromFile) + { + FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile))); + } + else + { + FileBasedSegments.push_back(Segment); + } + } + else + { + FileBasedSegments.push_back(Segment); + } + } + } + + CompositeBuffer FilePayload(std::move(FileBasedSegments)); + + ScheduleBackgroundWork([this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = std::move(FilePayload)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::Response CacheResponse = + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + Payload, + ContentType); + + m_Stats.PutBlobCount++; + m_Stats.PutBlobByteCount += Payload.GetSize(); + + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); } virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override |