diff options
Diffstat (limited to 'src/zenremotestore')
| -rw-r--r-- | src/zenremotestore/builds/buildstoragecache.cpp | 86 |
1 files changed, 68 insertions, 18 deletions
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index 9f17e6b15..07fcd62ba 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -13,6 +13,7 @@ #include <zenhttp/packageformat.h> ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> #include <tsl/robin_set.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -94,26 +95,75 @@ public: { ZEN_ASSERT(!IsFlushed); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - ScheduleBackgroundWork( - [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - HttpClient::Response CacheResponse = - m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), - Payload, - ContentType); - - m_Stats.PutBlobCount++; - m_Stats.PutBlobByteCount += Payload.GetSize(); - - AddStatistic(CacheResponse); - if (!CacheResponse.IsSuccess()) + + // Move all segments in Payload to be file handle based so if Payload is materialized it does not affect buffers in queue + std::vector<SharedBuffer> FileBasedSegments; + std::span<const SharedBuffer> Segments = Payload.GetSegments(); + FileBasedSegments.reserve(Segments.size()); + { + tsl::robin_map<void*, std::filesystem::path> HandleToPath; + for (const SharedBuffer& Segment : Segments) + { + std::filesystem::path FilePath; + IoBufferFileReference Ref; + if (Segment.AsIoBuffer().GetFileReference(Ref)) { - ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end()) + { + FilePath = It->second; + } + else + { + std::error_code Ec; + std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec); + if (!Ec && !Path.empty()) + { + HandleToPath.insert_or_assign(Ref.FileHandle, Path); + FilePath = std::move(Path); + } + } } - }); + + if (!FilePath.empty()) + { + IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize); + if (BufferFromFile) + { + FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile))); + } + else + { + FileBasedSegments.push_back(Segment); + } + } + else + { + FileBasedSegments.push_back(Segment); + } + } + } + + CompositeBuffer FilePayload(std::move(FileBasedSegments)); + + ScheduleBackgroundWork([this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = std::move(FilePayload)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::Response CacheResponse = + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + Payload, + ContentType); + + m_Stats.PutBlobCount++; + m_Stats.PutBlobByteCount += Payload.GetSize(); + + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); } virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override |