aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/zenremoteprojectstore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/projectstore/zenremoteprojectstore.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/projectstore/zenremoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp341
1 files changed, 341 insertions, 0 deletions
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
new file mode 100644
index 000000000..6ff471ae5
--- /dev/null
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -0,0 +1,341 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenremoteprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zenhttp/httpshared.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class ZenRemoteStore : public RemoteProjectStore
+{
+public:
+ ZenRemoteStore(std::string_view HostAddress,
+ std::string_view Project,
+ std::string_view Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize)
+ : m_HostAddress(HostAddress)
+ , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress))
+ , m_Project(Project)
+ , m_Oplog(Oplog)
+ , m_MaxBlockSize(MaxBlockSize)
+ , m_MaxChunkEmbedSize(MaxChunkEmbedSize)
+ {
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
+ MemoryView Data(Payload.GetView());
+ Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()});
+ cpr::Response Response = Session->Post();
+ SaveResult Result = SaveResult{ConvertResult(Response)};
+
+ if (Result.ErrorCode)
+ {
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload);
+ if (!ResponseObject)
+ {
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView();
+ for (CbFieldView FieldView : NeedsArray)
+ {
+ IoHash ChunkHash = FieldView.AsHash();
+ Result.Needs.insert(ChunkHash);
+ }
+
+ Result.RawHash = IoHash::HashBuffer(Payload);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
+ cpr::Response Response = Session->Post();
+ SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ Stopwatch Timer;
+
+ CbPackage RequestPackage;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "putchunks"sv);
+ RequestWriter.BeginArray("chunks"sv);
+ {
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize);
+ RequestWriter.AddHash(RawHash);
+ RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash));
+ }
+ }
+ RequestWriter.EndArray(); // "chunks"
+ RequestPackage.SetObject(RequestWriter.Save());
+ }
+ CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault);
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
+
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
+ cpr::Response Response = Session->Post();
+ SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+
+ CbObject Request;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "getchunks"sv);
+ RequestWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : RawHashes)
+ {
+ RequestWriter.AddHash(RawHash);
+ }
+ }
+ RequestWriter.EndArray(); // "chunks"
+ Request = RequestWriter.Save();
+ }
+ IoBuffer Payload = Request.GetBuffer().AsIoBuffer();
+ Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
+ Session->SetUrl(SaveRequest);
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))},
+ {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
+
+ cpr::Response Response = Session->Post();
+ LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ Result.Chunks.reserve(Attachments.size());
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
+ }
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ };
+
+ virtual Result FinalizeContainer(const IoHash&) override
+ {
+ Stopwatch Timer;
+
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ Sessions.clear();
+ return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
+ }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl(SaveRequest);
+ Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
+ Session->SetParameters(
+ {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}});
+ cpr::Response Response = Session->Get();
+
+ LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size()));
+ if (!Result.ContainerObject)
+ {
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
+ Session->SetUrl({LoadRequest});
+ Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
+ cpr::Response Response = Session->Get();
+ LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+private:
+ std::unique_ptr<cpr::Session> AllocateSession()
+ {
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ if (Sessions.empty())
+ {
+ Sessions.emplace_back(std::make_unique<cpr::Session>());
+ }
+ std::unique_ptr<cpr::Session> Session = std::move(Sessions.back());
+ Sessions.pop_back();
+ return Session;
+ }
+
+ void ReleaseSession(std::unique_ptr<cpr::Session>&& Session)
+ {
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ Sessions.emplace_back(std::move(Session));
+ }
+
+ static Result ConvertResult(const cpr::Response& Response)
+ {
+ std::string Text;
+ std::string Reason = Response.reason;
+ int32_t ErrorCode = 0;
+ if (Response.error.code != cpr::ErrorCode::OK)
+ {
+ ErrorCode = static_cast<int32_t>(Response.error.code);
+ if (!Response.error.message.empty())
+ {
+ Reason = Response.error.message;
+ }
+ }
+ else if (!IsHttpSuccessCode(Response.status_code))
+ {
+ ErrorCode = static_cast<int32_t>(Response.status_code);
+
+ if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
+ {
+ zen::HttpContentType ContentType = zen::ParseContentType(It->second);
+ if (ContentType == zen::HttpContentType::kText)
+ {
+ Text = Response.text;
+ }
+ }
+
+ Reason = fmt::format("{}"sv, Response.status_code);
+ }
+ return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text};
+ }
+
+ RwLock SessionsLock;
+ std::vector<std::unique_ptr<cpr::Session>> Sessions;
+
+ const std::string m_HostAddress;
+ const std::string m_ProjectStoreUrl;
+ const std::string m_Project;
+ const std::string m_Oplog;
+ const size_t m_MaxBlockSize;
+ const size_t m_MaxChunkEmbedSize;
+};
+
+std::unique_ptr<RemoteProjectStore>
+CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume https URL
+ Url = fmt::format("http://{}"sv, Url);
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore =
+ std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
+ return RemoteStore;
+}
+
+} // namespace zen