aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/zenremoteprojectstore.cpp
diff options
context:
space:
mode:
authorEPICGAMES\thierry.begin <[email protected]>2024-04-08 10:43:16 -0400
committerEPICGAMES\thierry.begin <[email protected]>2024-04-08 10:43:16 -0400
commitb35e1258a043cab06950b2453f434861d99b918a (patch)
tree695737774fa08ebaa0e32a9f95cb0247c34b3dc3 /src/zenserver/projectstore/zenremoteprojectstore.cpp
parentAdd docker support (diff)
parentMerge pull request #41 from ue-foundation/zs/import-oplog-clean (diff)
downloadzen-tb/docker.tar.xz
zen-tb/docker.zip
Merge branch 'main' of https://github.ol.epicgames.net/ue-foundation/zen into tb/dockertb/docker
Diffstat (limited to 'src/zenserver/projectstore/zenremoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp259
1 files changed, 70 insertions, 189 deletions
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index cfb558040..600338843 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -6,15 +6,10 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
#include <zencore/fmtutils.h>
-#include <zencore/scopeguard.h>
#include <zencore/stream.h>
-#include <zencore/timer.h>
+#include <zenhttp/httpclient.h>
#include <zenutil/packageformat.h>
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
namespace zen {
using namespace std::literals;
@@ -22,17 +17,16 @@ using namespace std::literals;
class ZenRemoteStore : public RemoteProjectStore
{
public:
- ZenRemoteStore(std::string_view HostAddress,
- std::string_view Project,
- std::string_view Oplog,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize)
+ ZenRemoteStore(std::string_view HostAddress,
+ std::string_view Project,
+ std::string_view Oplog,
+ const std::filesystem::path& TempFilePath)
: m_HostAddress(HostAddress)
, m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress))
, m_Project(Project)
, m_Oplog(Oplog)
- , m_MaxBlockSize(MaxBlockSize)
- , m_MaxChunkEmbedSize(MaxChunkEmbedSize)
+ , m_TempFilePath(TempFilePath)
+ , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2})
{
}
@@ -47,39 +41,27 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
- Stopwatch Timer;
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
-
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
- Session->SetUrl({SaveRequest});
- Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
- MemoryView Data(Payload.GetView());
- Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()});
- cpr::Response Response = Session->Post();
- SaveResult Result = SaveResult{ConvertResult(Response)};
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject);
+ SaveResult Result = SaveResult{ConvertResult(Response)};
if (Result.ErrorCode)
{
- Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'",
m_ProjectStoreUrl,
m_Project,
m_Oplog,
Result.Reason);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
- IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
- CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload);
+ CbObject ResponseObject = Response.AsObject();
if (!ResponseObject)
{
- Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog);
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
return Result;
}
CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView();
@@ -89,33 +71,15 @@ public:
Result.Needs.insert(ChunkHash);
}
- Result.RawHash = IoHash::HashBuffer(Payload);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.RawHash = IoHash::HashBuffer(Payload);
return Result;
}
virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
{
- Stopwatch Timer;
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
-
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
- Session->SetUrl({SaveRequest});
- Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
- cpr::Response Response = Session->Post();
- SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary);
+ SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
@@ -125,14 +89,11 @@ public:
RawHash,
Result.Reason);
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
{
- Stopwatch Timer;
-
CbPackage RequestPackage;
{
CbObjectWriter RequestWriter;
@@ -151,26 +112,10 @@ public:
RequestWriter.EndArray(); // "chunks"
RequestPackage.SetObject(RequestWriter.Save());
}
- CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault);
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
- Session->SetUrl({SaveRequest});
- Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage);
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
- cpr::Response Response = Session->Post();
- SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
+ SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'",
@@ -180,17 +125,12 @@ public:
m_Oplog,
Result.Reason);
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
{
- Stopwatch Timer;
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
CbObject Request;
{
@@ -206,17 +146,22 @@ public:
RequestWriter.EndArray(); // "chunks"
Request = RequestWriter.Save();
}
- IoBuffer Payload = Request.GetBuffer().AsIoBuffer();
- Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
- Session->SetUrl(SaveRequest);
- Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))},
- {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
- cpr::Response Response = Session->Post();
- LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
- if (!Result.ErrorCode)
+ HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage));
+
+ LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
{
- CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
+ Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
+ RawHashes.size(),
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ }
+ else
+ {
+ CbPackage Package = Response.AsPackage();
std::span<const CbAttachment> Attachments = Package.GetAttachments();
Result.Chunks.reserve(Attachments.size());
for (const CbAttachment& Attachment : Attachments)
@@ -225,42 +170,17 @@ public:
std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
}
}
- else
- {
- Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
- RawHashes.size(),
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- Result.Reason);
- }
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
};
- virtual FinalizeResult FinalizeContainer(const IoHash&) override
- {
- Stopwatch Timer;
-
- RwLock::ExclusiveLockScope _(SessionsLock);
- Sessions.clear();
- return FinalizeResult{Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}};
- }
+ virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; }
virtual LoadContainerResult LoadContainer() override
{
- Stopwatch Timer;
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog);
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
- Session->SetUrl(SaveRequest);
- Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
- Session->SetParameters(
- {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}});
- cpr::Response Response = Session->Get();
-
- LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
+ HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject));
+ LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'",
@@ -271,7 +191,7 @@ public:
}
else
{
- Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size()));
+ Result.ContainerObject = Response.AsObject();
if (!Result.ContainerObject)
{
Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
@@ -281,7 +201,6 @@ public:
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
}
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
@@ -299,19 +218,14 @@ public:
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
- Stopwatch Timer;
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
-
- std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
- Session->SetUrl({LoadRequest});
- Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
- cpr::Response Response = Session->Get();
- LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response =
+ m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
+ LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
if (!Result.ErrorCode)
{
- Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ Result.Bytes = Response.ResponsePayload;
+ Result.Bytes.MakeOwned();
}
if (!Result.ErrorCode)
{
@@ -322,73 +236,40 @@ public:
RawHash,
Result.Reason);
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
private:
- std::unique_ptr<cpr::Session> AllocateSession()
- {
- RwLock::ExclusiveLockScope _(SessionsLock);
- if (Sessions.empty())
- {
- Sessions.emplace_back(std::make_unique<cpr::Session>());
- }
- std::unique_ptr<cpr::Session> Session = std::move(Sessions.back());
- Sessions.pop_back();
- return Session;
- }
-
- void ReleaseSession(std::unique_ptr<cpr::Session>&& Session)
+ static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
{
- RwLock::ExclusiveLockScope _(SessionsLock);
- Sessions.emplace_back(std::move(Session));
- }
-
- static Result ConvertResult(const cpr::Response& Response)
- {
- std::string Text;
- std::string Reason = Response.reason;
- int32_t ErrorCode = 0;
- if (Response.error.code != cpr::ErrorCode::OK)
+ if (Response.Error)
{
- ErrorCode = static_cast<int32_t>(Response.error.code);
- if (!Response.error.message.empty())
- {
- Reason = Response.error.message;
- }
+ return {.ErrorCode = Response.Error.value().ErrorCode,
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Reason = Response.ErrorMessage(""),
+ .Text = Response.ToText()};
}
- else if (!IsHttpSuccessCode(Response.status_code))
+ if (!Response.IsSuccess())
{
- ErrorCode = static_cast<int32_t>(Response.status_code);
-
- if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
- {
- zen::HttpContentType ContentType = zen::ParseContentType(It->second);
- if (ContentType == zen::HttpContentType::kText)
- {
- Text = Response.text;
- }
- }
-
- Reason = fmt::format("{}"sv, Response.status_code);
+ return {.ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Text = Response.ToText()};
}
- return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text};
+ return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds};
}
- RwLock SessionsLock;
- std::vector<std::unique_ptr<cpr::Session>> Sessions;
+ const std::string m_HostAddress;
+ const std::string m_ProjectStoreUrl;
+ const std::string m_Project;
+ const std::string m_Oplog;
+ const std::filesystem::path m_TempFilePath;
- const std::string m_HostAddress;
- const std::string m_ProjectStoreUrl;
- const std::string m_Project;
- const std::string m_Oplog;
- const size_t m_MaxBlockSize;
- const size_t m_MaxChunkEmbedSize;
+ HttpClient m_Client;
};
std::shared_ptr<RemoteProjectStore>
-CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
+CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
{
std::string Url = Options.Url;
if (Url.find("://"sv) == std::string::npos)
@@ -397,7 +278,7 @@ CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
Url = fmt::format("http://{}"sv, Url);
}
std::shared_ptr<RemoteProjectStore> RemoteStore =
- std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
+ std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, TempFilePath);
return RemoteStore;
}