aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-03 09:38:05 +0200
committerGitHub Enterprise <[email protected]>2025-10-03 09:38:05 +0200
commitf5727a1e4d6bfb833e37e1210691d351456dbe3a (patch)
treecbf7688c4e31ba7db429a98c7c9f813fa0a826be /src/zenremotestore/projectstore/zenremoteprojectstore.cpp
parentmove projectstore to zenstore (#541) (diff)
downloadzen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.tar.xz
zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.zip
move remoteproject to remotestorelib (#542)
* move remoteproject code to remotestorelib
Diffstat (limited to 'src/zenremotestore/projectstore/zenremoteprojectstore.cpp')
-rw-r--r--src/zenremotestore/projectstore/zenremoteprojectstore.cpp336
1 files changed, 336 insertions, 0 deletions
diff --git a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
new file mode 100644
index 000000000..000901e45
--- /dev/null
+++ b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
@@ -0,0 +1,336 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/projectstore/zenremoteprojectstore.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/packageformat.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+class ZenRemoteStore : public RemoteProjectStore
+{
+public:
+ ZenRemoteStore(std::string_view HostAddress,
+ std::string_view Project,
+ std::string_view Oplog,
+ const std::filesystem::path& TempFilePath)
+ : m_HostAddress(HostAddress)
+ , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress))
+ , m_Project(Project)
+ , m_Oplog(Oplog)
+ , m_TempFilePath(TempFilePath)
+ , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2})
+ {
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = false,
+ .UseTempBlockFiles = false,
+ .AllowChunking = false,
+ .ContainerName = fmt::format("{}/{}", m_Project, m_Oplog),
+ .Description = fmt::format("[zen] {}. SessionId: {}"sv, m_HostAddress, m_Client.GetSessionId())};
+ }
+
+ virtual Stats GetStats() const override
+ {
+ return {.m_SentBytes = m_SentBytes.load(),
+ .m_ReceivedBytes = m_ReceivedBytes.load(),
+ .m_RequestTimeNS = m_RequestTimeNS.load(),
+ .m_RequestCount = m_RequestCount.load(),
+ .m_PeakSentBytes = m_PeakSentBytes.load(),
+ .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
+ .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
+ }
+
+ virtual CreateContainerResult CreateContainer() override
+ {
+ // Nothing to do here
+ return {};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject);
+ AddStats(Response);
+ SaveResult Result = SaveResult{ConvertResult(Response)};
+
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ return Result;
+ }
+ CbObject ResponseObject = Response.AsObject();
+ if (!ResponseObject)
+ {
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+ CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView();
+ for (CbFieldView FieldView : NeedsArray)
+ {
+ IoHash ChunkHash = FieldView.AsHash();
+ Result.Needs.insert(ChunkHash);
+ }
+
+ Result.RawHash = IoHash::HashBuffer(Payload);
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
+ {
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary);
+ AddStats(Response);
+ SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ RawHash,
+ Result.Reason);
+ }
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ CbPackage RequestPackage;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "putchunks"sv);
+ RequestWriter.BeginArray("chunks"sv);
+ {
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize);
+ ZEN_ASSERT(Compressed);
+ RequestWriter.AddHash(RawHash);
+ RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash));
+ }
+ }
+ RequestWriter.EndArray(); // "chunks"
+ RequestPackage.SetObject(RequestWriter.Save());
+ }
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage);
+ AddStats(Response);
+
+ SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'",
+ Chunks.size(),
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ }
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
+
+ CbObject Request;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "getchunks"sv);
+ RequestWriter.BeginObject("Request"sv);
+ {
+ RequestWriter.BeginArray("Chunks"sv);
+ {
+ for (const IoHash& RawHash : RawHashes)
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.AddHash("RawHash", RawHash);
+ }
+ RequestWriter.EndObject();
+ }
+ }
+ RequestWriter.EndArray(); // "chunks"
+ }
+ RequestWriter.EndObject();
+ Request = RequestWriter.Save();
+ }
+
+ HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+
+ LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
+ RawHashes.size(),
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ }
+ else
+ {
+ CbPackage Package = Response.AsPackage();
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ Result.Chunks.reserve(Attachments.size());
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
+ }
+ }
+ return Result;
+ };
+
+ virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog);
+
+ HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject));
+ AddStats(Response);
+
+ LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ }
+ else
+ {
+ Result.ContainerObject = Response.AsObject();
+ if (!Result.ContainerObject)
+ {
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ }
+ }
+ return Result;
+ }
+
+ virtual GetKnownBlocksResult GetKnownBlocks() override
+ {
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response =
+ m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
+ AddStats(Response);
+
+ LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ Result.Bytes = Response.ResponsePayload;
+ Result.Bytes.MakeOwned();
+ }
+ if (!Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ RawHash,
+ Result.Reason);
+ }
+ return Result;
+ }
+
+private:
+ void AddStats(const HttpClient::Response& Result)
+ {
+ m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.UploadedBytes));
+ m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.DownloadedBytes));
+ m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
+ SetAtomicMax(m_PeakSentBytes, Result.UploadedBytes);
+ SetAtomicMax(m_PeakReceivedBytes, Result.DownloadedBytes);
+ if (Result.ElapsedSeconds > 0.0)
+ {
+ uint64_t BytesPerSec = static_cast<uint64_t>((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds);
+ SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
+ }
+
+ m_RequestCount.fetch_add(1);
+ }
+
+ static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
+ {
+ if (Response.Error)
+ {
+ return {.ErrorCode = Response.Error.value().ErrorCode,
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Reason = Response.ErrorMessage(""),
+ .Text = Response.ToText()};
+ }
+ if (!Response.IsSuccess())
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Text = Response.ToText()};
+ }
+ return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds};
+ }
+
+ const std::string m_HostAddress;
+ const std::string m_ProjectStoreUrl;
+ const std::string m_Project;
+ const std::string m_Oplog;
+ const std::filesystem::path m_TempFilePath;
+
+ HttpClient m_Client;
+
+ std::atomic_uint64_t m_SentBytes = {};
+ std::atomic_uint64_t m_ReceivedBytes = {};
+ std::atomic_uint64_t m_RequestTimeNS = {};
+ std::atomic_uint64_t m_RequestCount = {};
+ std::atomic_uint64_t m_PeakSentBytes = {};
+ std::atomic_uint64_t m_PeakReceivedBytes = {};
+ std::atomic_uint64_t m_PeakBytesPerSec = {};
+};
+
+std::shared_ptr<RemoteProjectStore>
+CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume http URL
+ Url = fmt::format("http://{}"sv, Url);
+ }
+ std::shared_ptr<RemoteProjectStore> RemoteStore =
+ std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, TempFilePath);
+ return RemoteStore;
+}
+
+} // namespace zen