aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore')
-rw-r--r--src/zenremotestore/builds/buildstoragecache.cpp86
1 files changed, 68 insertions, 18 deletions
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp
index 9f17e6b15..07fcd62ba 100644
--- a/src/zenremotestore/builds/buildstoragecache.cpp
+++ b/src/zenremotestore/builds/buildstoragecache.cpp
@@ -13,6 +13,7 @@
#include <zenhttp/packageformat.h>
ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
#include <tsl/robin_set.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -94,26 +95,75 @@ public:
{
ZEN_ASSERT(!IsFlushed);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
- ScheduleBackgroundWork(
- [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
- ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob");
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
-
- HttpClient::Response CacheResponse =
- m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
- Payload,
- ContentType);
-
- m_Stats.PutBlobCount++;
- m_Stats.PutBlobByteCount += Payload.GetSize();
-
- AddStatistic(CacheResponse);
- if (!CacheResponse.IsSuccess())
+
+ // Move all segments in Payload to be file handle based so if Payload is materialized it does not affect buffers in queue
+ std::vector<SharedBuffer> FileBasedSegments;
+ std::span<const SharedBuffer> Segments = Payload.GetSegments();
+ FileBasedSegments.reserve(Segments.size());
+ {
+ tsl::robin_map<void*, std::filesystem::path> HandleToPath;
+ for (const SharedBuffer& Segment : Segments)
+ {
+ std::filesystem::path FilePath;
+ IoBufferFileReference Ref;
+ if (Segment.AsIoBuffer().GetFileReference(Ref))
{
- ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv));
+ if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end())
+ {
+ FilePath = It->second;
+ }
+ else
+ {
+ std::error_code Ec;
+ std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec);
+ if (!Ec && !Path.empty())
+ {
+ HandleToPath.insert_or_assign(Ref.FileHandle, Path);
+ FilePath = std::move(Path);
+ }
+ }
}
- });
+
+ if (!FilePath.empty())
+ {
+ IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize);
+ if (BufferFromFile)
+ {
+ FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile)));
+ }
+ else
+ {
+ FileBasedSegments.push_back(Segment);
+ }
+ }
+ else
+ {
+ FileBasedSegments.push_back(Segment);
+ }
+ }
+ }
+
+ CompositeBuffer FilePayload(std::move(FileBasedSegments));
+
+ ScheduleBackgroundWork([this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = std::move(FilePayload)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob");
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ Payload,
+ ContentType);
+
+ m_Stats.PutBlobCount++;
+ m_Stats.PutBlobByteCount += Payload.GetSize();
+
+ AddStatistic(CacheResponse);
+ if (!CacheResponse.IsSuccess())
+ {
+ ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv));
+ }
+ });
}
virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override