diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 09:38:05 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 09:38:05 +0200 |
| commit | f5727a1e4d6bfb833e37e1210691d351456dbe3a (patch) | |
| tree | cbf7688c4e31ba7db429a98c7c9f813fa0a826be /src/zenremotestore/projectstore/zenremoteprojectstore.cpp | |
| parent | move projectstore to zenstore (#541) (diff) | |
| download | zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.tar.xz zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.zip | |
move remoteproject to remotestorelib (#542)
* move remoteproject code to remotestorelib
Diffstat (limited to 'src/zenremotestore/projectstore/zenremoteprojectstore.cpp')
| -rw-r--r-- | src/zenremotestore/projectstore/zenremoteprojectstore.cpp | 336 |
1 files changed, 336 insertions, 0 deletions
diff --git a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp new file mode 100644 index 000000000..000901e45 --- /dev/null +++ b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp @@ -0,0 +1,336 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/projectstore/zenremoteprojectstore.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compositebuffer.h> +#include <zencore/fmtutils.h> +#include <zencore/stream.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> + +namespace zen { + +using namespace std::literals; + +class ZenRemoteStore : public RemoteProjectStore +{ +public: + ZenRemoteStore(std::string_view HostAddress, + std::string_view Project, + std::string_view Oplog, + const std::filesystem::path& TempFilePath) + : m_HostAddress(HostAddress) + , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) + , m_Project(Project) + , m_Oplog(Oplog) + , m_TempFilePath(TempFilePath) + , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2}) + { + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = false, + .UseTempBlockFiles = false, + .AllowChunking = false, + .ContainerName = fmt::format("{}/{}", m_Project, m_Oplog), + .Description = fmt::format("[zen] {}. SessionId: {}"sv, m_HostAddress, m_Client.GetSessionId())}; + } + + virtual Stats GetStats() const override + { + return {.m_SentBytes = m_SentBytes.load(), + .m_ReceivedBytes = m_ReceivedBytes.load(), + .m_RequestTimeNS = m_RequestTimeNS.load(), + .m_RequestCount = m_RequestCount.load(), + .m_PeakSentBytes = m_PeakSentBytes.load(), + .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), + .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; + } + + virtual CreateContainerResult CreateContainer() override + { + // Nothing to do here + return {}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog); + HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject); + AddStats(Response); + SaveResult Result = SaveResult{ConvertResult(Response)}; + + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + return Result; + } + CbObject ResponseObject = Response.AsObject(); + if (!ResponseObject) + { + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); + for (CbFieldView FieldView : NeedsArray) + { + IoHash ChunkHash = FieldView.AsHash(); + Result.Needs.insert(ChunkHash); + } + + Result.RawHash = IoHash::HashBuffer(Payload); + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override + { + std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); + AddStats(Response); + SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + RawHash, + Result.Reason); + } + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + CbPackage RequestPackage; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "putchunks"sv); + RequestWriter.BeginArray("chunks"sv); + { + for (const SharedBuffer& Chunk : Chunks) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize); + ZEN_ASSERT(Compressed); + RequestWriter.AddHash(RawHash); + RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash)); + } + } + RequestWriter.EndArray(); // "chunks" + RequestPackage.SetObject(RequestWriter.Save()); + } + std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); + HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage); + AddStats(Response); + + SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'", + Chunks.size(), + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + } + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); + + CbObject Request; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "getchunks"sv); + RequestWriter.BeginObject("Request"sv); + { + RequestWriter.BeginArray("Chunks"sv); + { + for (const IoHash& RawHash : RawHashes) + { + RequestWriter.BeginObject(); + { + RequestWriter.AddHash("RawHash", RawHash); + } + RequestWriter.EndObject(); + } + } + RequestWriter.EndArray(); // "chunks" + } + RequestWriter.EndObject(); + Request = RequestWriter.Save(); + } + + HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + + LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", + RawHashes.size(), + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + } + else + { + CbPackage Package = Response.AsPackage(); + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + Result.Chunks.reserve(Attachments.size()); + for (const CbAttachment& Attachment : Attachments) + { + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); + } + } + return Result; + }; + + virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; } + + virtual LoadContainerResult LoadContainer() override + { + std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog); + + HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject)); + AddStats(Response); + + LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + } + else + { + Result.ContainerObject = Response.AsObject(); + if (!Result.ContainerObject) + { + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + } + } + return Result; + } + + virtual GetKnownBlocksResult GetKnownBlocks() override + { + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = + m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); + AddStats(Response); + + LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + Result.Bytes = Response.ResponsePayload; + Result.Bytes.MakeOwned(); + } + if (!Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + RawHash, + Result.Reason); + } + return Result; + } + +private: + void AddStats(const HttpClient::Response& Result) + { + m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.UploadedBytes)); + m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.DownloadedBytes)); + m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); + SetAtomicMax(m_PeakSentBytes, Result.UploadedBytes); + SetAtomicMax(m_PeakReceivedBytes, Result.DownloadedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = static_cast<uint64_t>((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); + } + + m_RequestCount.fetch_add(1); + } + + static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) + { + if (Response.Error) + { + return {.ErrorCode = Response.Error.value().ErrorCode, + .ElapsedSeconds = Response.ElapsedSeconds, + .Reason = Response.ErrorMessage(""), + .Text = Response.ToText()}; + } + if (!Response.IsSuccess()) + { + return {.ErrorCode = static_cast<int32_t>(Response.StatusCode), + .ElapsedSeconds = Response.ElapsedSeconds, + .Reason = Response.ErrorMessage(ErrorPrefix), + .Text = Response.ToText()}; + } + return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds}; + } + + const std::string m_HostAddress; + const std::string m_ProjectStoreUrl; + const std::string m_Project; + const std::string m_Oplog; + const std::filesystem::path m_TempFilePath; + + HttpClient m_Client; + + std::atomic_uint64_t m_SentBytes = {}; + std::atomic_uint64_t m_ReceivedBytes = {}; + std::atomic_uint64_t m_RequestTimeNS = {}; + std::atomic_uint64_t m_RequestCount = {}; + std::atomic_uint64_t m_PeakSentBytes = {}; + std::atomic_uint64_t m_PeakReceivedBytes = {}; + std::atomic_uint64_t m_PeakBytesPerSec = {}; +}; + +std::shared_ptr<RemoteProjectStore> +CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume http URL + Url = fmt::format("http://{}"sv, Url); + } + std::shared_ptr<RemoteProjectStore> RemoteStore = + std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, TempFilePath); + return RemoteStore; +} + +} // namespace zen |