aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp12
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp6
-rw-r--r--src/zenserver/projectstore/projectstore.cpp40
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp2
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h2
-rw-r--r--src/zenserver/projectstore/zencacheremoteprojectstore.cpp735
-rw-r--r--src/zenserver/projectstore/zencacheremoteprojectstore.h21
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp43
8 files changed, 841 insertions, 20 deletions
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
index 5c7a5536a..0673030a1 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -57,15 +57,13 @@ public:
.m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
}
- virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ virtual SaveResult SaveContainer(const CbObject& Container) override
{
Stopwatch Timer;
SaveResult Result;
{
- CbObject ContainerObject = LoadCompactBinaryObject(Payload);
-
- ContainerObject.IterateAttachments([&](CbFieldView FieldView) {
+ Container.IterateAttachments([&](CbFieldView FieldView) {
IoHash AttachmentHash = FieldView.AsBinaryAttachment();
std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash);
if (!std::filesystem::exists(AttachmentPath))
@@ -84,19 +82,19 @@ public:
BasicFile ContainerFile;
ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate);
std::error_code Ec;
- ContainerFile.WriteAll(Payload, Ec);
+ ContainerFile.WriteAll(Container.GetBuffer().AsIoBuffer(), Ec);
if (Ec)
{
throw std::system_error(Ec, Ec.message());
}
- Result.RawHash = IoHash::HashBuffer(Payload);
+ Result.RawHash = IoHash::HashBuffer(Container.GetBuffer().AsIoBuffer());
}
catch (const std::exception& Ex)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what());
}
- AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000);
+ AddStats(Container.GetBuffer().GetSize(), 0, Timer.GetElapsedTimeUs() * 1000);
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index bede3abb4..51224aca1 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -66,11 +66,11 @@ public:
.m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
}
- virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ virtual SaveResult SaveContainer(const CbObject& Container) override
{
CloudCacheSession Session(m_CloudClient.Get());
- PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
- AddStats(PutResult);
+ PutRefResult PutResult =
+ Session.PutRef(m_Namespace, m_Bucket, m_Key, Container.GetBuffer().AsIoBuffer(), ZenContentType::kCbObject);
SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
if (Result.ErrorCode)
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 65d2730d8..aba6fbe7e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -25,6 +25,7 @@
#include "fileremoteprojectstore.h"
#include "jupiterremoteprojectstore.h"
#include "remoteprojectstore.h"
+#include "zencacheremoteprojectstore.h"
#include "zenremoteprojectstore.h"
ZEN_THIRD_PARTY_INCLUDES_START
@@ -196,11 +197,11 @@ namespace {
{
std::string_view Url = Zen["url"sv].AsString();
std::string_view Project = Zen["project"sv].AsString();
+ std::string_view Oplog = Zen["oplog"sv].AsString();
if (Project.empty())
{
return {nullptr, "Missing project"};
}
- std::string_view Oplog = Zen["oplog"sv].AsString();
if (Oplog.empty())
{
return {nullptr, "Missing oplog"};
@@ -212,6 +213,43 @@ namespace {
RemoteStore = CreateZenRemoteStore(Options, TempFilePath);
}
+ if (CbObjectView Zen = Params["zencache"sv].AsObjectView(); Zen)
+ {
+ std::string_view Url = Zen["url"sv].AsString();
+ std::string_view Namespace = Zen["namespace"sv].AsString();
+ std::string_view Bucket = Zen["bucket"sv].AsString();
+ std::string_view KeyParam = Zen["key"sv].AsString();
+ bool ForceBlocks = Zen["forceblocks"sv].AsBool(false);
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ if (KeyParam.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ if (KeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid key"};
+ }
+ IoHash Key = IoHash::FromHexString(KeyParam);
+ if (Key == IoHash::Zero)
+ {
+ return {nullptr, "Invalid key string"};
+ }
+ ZenCacheRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ std::string(Url),
+ std::string(Namespace),
+ std::string(Bucket),
+ Key,
+ ForceBlocks};
+ RemoteStore = CreateZenCacheRemoteStore(Options, TempFilePath);
+ }
+
if (!RemoteStore)
{
return {nullptr, "Unknown remote store type"};
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index cc9385f5e..61c04f50a 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -1951,7 +1951,7 @@ SaveOplog(CidStore& ChunkStore,
ChunkCount,
BlockCount));
Stopwatch SaveContainerTimer;
- RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
+ RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject);
TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs();
if (ContainerSaveResult.ErrorCode)
{
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 6b83e526c..96b8e7526 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -90,7 +90,7 @@ public:
virtual RemoteStoreInfo GetInfo() const = 0;
virtual Stats GetStats() const = 0;
- virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
+ virtual SaveResult SaveContainer(const CbObject& Container) = 0;
virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0;
virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
diff --git a/src/zenserver/projectstore/zencacheremoteprojectstore.cpp b/src/zenserver/projectstore/zencacheremoteprojectstore.cpp
new file mode 100644
index 000000000..7c87aeda0
--- /dev/null
+++ b/src/zenserver/projectstore/zencacheremoteprojectstore.cpp
@@ -0,0 +1,735 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencacheremoteprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
+#include <zenhttp/httpclient.h>
+#include <zenutil/cache/cachepolicy.h>
+#include <zenutil/packageformat.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+class ZenCacheRemoteStore : public RemoteProjectStore
+{
+public:
+ ZenCacheRemoteStore(std::string_view HostAddress,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& Key,
+ bool ForceBlocks,
+ const std::filesystem::path& TempFilePath)
+ : m_HostAddress(HostAddress)
+ , m_ProjectStoreUrl(fmt::format("{}"sv, m_HostAddress))
+ , m_Namespace(Namespace)
+ , m_Bucket(Bucket)
+ , m_Key(Key)
+ , m_ForceBlocks(ForceBlocks)
+ , m_TempFilePath(TempFilePath)
+ , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenCacheRemoteStore", .RetryCount = 2})
+ {
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = m_ForceBlocks,
+ .UseTempBlockFiles = false,
+ .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
+ .BaseContainerName = "",
+ .Description = fmt::format("[zen] {}"sv, m_HostAddress)};
+ }
+
+ virtual Stats GetStats() const override
+ {
+ return {.m_SentBytes = m_SentBytes.load(),
+ .m_ReceivedBytes = m_ReceivedBytes.load(),
+ .m_RequestTimeNS = m_RequestTimeNS.load(),
+ .m_RequestCount = m_RequestCount.load(),
+ .m_PeakSentBytes = m_PeakSentBytes.load(),
+ .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
+ .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
+ }
+
+ virtual SaveResult SaveContainer(const CbObject& Container) override
+ {
+ std::string SaveRequest = fmt::format("/z$/$rpc");
+
+ CbPackage Package;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("Method"sv, "PutCacheRecords"sv);
+ RequestWriter.AddInteger("Accept"sv, kCbPkgMagic);
+ RequestWriter.BeginObject("Params"sv);
+ {
+ RequestWriter.AddString("Namespace"sv, m_Namespace);
+ RequestWriter.BeginArray("Requests"sv);
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.BeginObject("Key"sv);
+ {
+ RequestWriter.AddString("Bucket"sv, m_Bucket);
+ RequestWriter.AddHash("Hash"sv, m_Key);
+ }
+ RequestWriter.EndObject(); // Key
+ RequestWriter.AddObject("Record"sv, Container);
+ }
+ RequestWriter.EndObject();
+ }
+ RequestWriter.EndArray(); // Requests
+ }
+ RequestWriter.EndObject(); // Params
+ Package.SetObject(RequestWriter.Save());
+ }
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+ SaveResult Result = SaveResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog container to {}/z$/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ return Result;
+ }
+ CbPackage ResponsePackage = Response.AsPackage();
+ if (!ResponsePackage)
+ {
+ Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv,
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+ CbObject ResponseObject = ResponsePackage.GetObject();
+ for (CbFieldView Reponse : ResponseObject["Result"sv])
+ {
+ bool Success = Reponse.AsBool();
+ if (!Success)
+ {
+ Result.Reason = "Remote server failed to save oplog container (return Success == false)";
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+ }
+ CbArrayView NeedsArray = ResponseObject["needs"sv].AsArrayView();
+ for (CbFieldView FieldView : NeedsArray)
+ {
+ IoHash ChunkHash = FieldView.AsHash();
+ Result.Needs.insert(ChunkHash);
+ }
+
+ Result.RawHash = IoHash::HashBuffer(Container.GetBuffer().AsIoBuffer());
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ {
+ std::string SaveRequest = fmt::format("/z$/$rpc");
+
+ CbPackage Package;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("Method"sv, "PutRecordAttachments"sv);
+ RequestWriter.AddInteger("Accept"sv, kCbPkgMagic);
+ RequestWriter.BeginObject("Params"sv);
+ {
+ RequestWriter.AddString("Namespace"sv, m_Namespace);
+ RequestWriter.BeginArray("Requests"sv);
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.BeginObject("Key"sv);
+ {
+ RequestWriter.AddString("Bucket"sv, m_Bucket);
+ RequestWriter.AddHash("Hash"sv, m_Key);
+ }
+ RequestWriter.EndObject(); // Key
+ RequestWriter.BeginArray("Chunks"sv);
+ {
+ IoHash VerifyRawHash;
+ uint64_t _;
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, _))
+ {
+ ZEN_ASSERT(VerifyRawHash == RawHash);
+ Package.AddAttachment(CbAttachment(Compressed, RawHash));
+ RequestWriter.AddHash(RawHash);
+ }
+ }
+ RequestWriter.EndArray(); // Chunks
+ }
+ RequestWriter.EndObject();
+ }
+ RequestWriter.EndArray(); // Requests
+ }
+ RequestWriter.EndObject(); // Params
+ Package.SetObject(RequestWriter.Save());
+ }
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+ SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ return Result;
+ }
+ // TODO: Parse attachment results?
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ std::string SaveRequest = fmt::format("/z$/$rpc");
+
+ CbPackage Package;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("Method"sv, "PutRecordAttachments"sv);
+ RequestWriter.AddInteger("Accept"sv, kCbPkgMagic);
+ RequestWriter.BeginObject("Params"sv);
+ {
+ RequestWriter.AddString("Namespace"sv, m_Namespace);
+ RequestWriter.BeginArray("Requests"sv);
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.BeginObject("Key"sv);
+ {
+ RequestWriter.AddString("Bucket"sv, m_Bucket);
+ RequestWriter.AddHash("Hash"sv, m_Key);
+ }
+ RequestWriter.EndObject(); // Key
+ RequestWriter.BeginArray("Chunks"sv);
+ {
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ IoHash RawHash;
+ uint64_t _;
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, _))
+ {
+ Package.AddAttachment(CbAttachment(Compressed, RawHash));
+ RequestWriter.AddHash(RawHash);
+ }
+ }
+ }
+ RequestWriter.EndArray(); // Chunks
+ }
+ RequestWriter.EndObject();
+ }
+ RequestWriter.EndArray(); // Requests
+ }
+ RequestWriter.EndObject(); // Params
+ Package.SetObject(RequestWriter.Save());
+ }
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+ SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ return Result;
+ }
+ // TODO: Parse attachment results?
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ std::string SaveRequest = fmt::format("/z$/$rpc");
+
+ CbPackage Package;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("Method"sv, "GetRecordAttachments"sv);
+ RequestWriter.AddInteger("Accept"sv, kCbPkgMagic);
+ RequestWriter.BeginObject("Params"sv);
+ {
+ RequestWriter.AddString("Namespace"sv, m_Namespace);
+ RequestWriter.BeginArray("Requests"sv);
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.BeginObject("Key"sv);
+ {
+ RequestWriter.AddString("Bucket"sv, m_Bucket);
+ RequestWriter.AddHash("Hash"sv, m_Key);
+ }
+ RequestWriter.EndObject(); // Key
+ RequestWriter.BeginArray("Chunks"sv);
+ {
+ for (const IoHash& RawHash : RawHashes)
+ {
+ RequestWriter.AddHash(RawHash);
+ }
+ }
+ RequestWriter.EndArray(); // Chunks
+ }
+ RequestWriter.EndObject();
+ }
+ RequestWriter.EndArray(); // Requests
+ }
+ RequestWriter.EndObject(); // Params
+ Package.SetObject(RequestWriter.Save());
+ }
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+ LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed loading oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ return Result;
+ }
+
+ CbPackage ResponsePackage = Response.AsPackage();
+ if (!ResponsePackage)
+ {
+ Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv,
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+ CbObject ResponseObject = ResponsePackage.GetObject();
+ for (CbFieldView Reponse : ResponseObject["Result"sv])
+ {
+ CbObjectView ResultView = Reponse.AsObjectView();
+ CbArrayView ChunksArrayView = ResultView["Chunks"sv].AsArrayView();
+ for (CbFieldView ChunkView : ChunksArrayView)
+ {
+ IoHash ChunkHash = ChunkView.AsHash();
+ if (ChunkHash != IoHash::Zero)
+ {
+ const CbAttachment* Attachment = ResponsePackage.FindAttachment(ChunkHash);
+ if (Attachment && Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ Result.Chunks.emplace_back(std::make_pair(ChunkHash, Chunk));
+ }
+ }
+ }
+ }
+ return Result;
+ };
+
+ virtual FinalizeResult FinalizeContainer(const IoHash&) override
+ {
+ std::string SaveRequest = fmt::format("/z$/$rpc");
+
+ CbPackage Package;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("Method"sv, "PutCacheRecords"sv);
+ RequestWriter.AddInteger("Accept"sv, kCbPkgMagic);
+ RequestWriter.BeginObject("Params"sv);
+ {
+ RequestWriter.AddString("Namespace"sv, m_Namespace);
+ RequestWriter.BeginArray("Requests"sv);
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.BeginObject("Key"sv);
+ {
+ RequestWriter.AddString("Bucket"sv, m_Bucket);
+ RequestWriter.AddHash("Hash"sv, m_Key);
+ }
+ RequestWriter.EndObject(); // Key
+ // RequestWriter.AddObject("Record"sv, Container);
+ }
+ RequestWriter.EndObject();
+ }
+ RequestWriter.EndArray(); // Requests
+ }
+ RequestWriter.EndObject(); // Params
+ Package.SetObject(RequestWriter.Save());
+ }
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+ FinalizeResult Result = FinalizeResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog container to {}/z$/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ return Result;
+ }
+ CbPackage ResponsePackage = Response.AsPackage();
+ if (!ResponsePackage)
+ {
+ Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv,
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+ CbObject ResponseObject = ResponsePackage.GetObject();
+ for (CbFieldView Reponse : ResponseObject["Result"sv])
+ {
+ bool Success = Reponse.AsBool();
+ if (!Success)
+ {
+ Result.Reason = "Remote server failed to save oplog container (return Success == false)";
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+ }
+ CbArrayView NeedsArray = ResponseObject["needs"sv].AsArrayView();
+ for (CbFieldView FieldView : NeedsArray)
+ {
+ IoHash ChunkHash = FieldView.AsHash();
+ Result.Needs.insert(ChunkHash);
+ }
+
+ return Result;
+ }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ std::string LoadRequest = fmt::format("/z$/$rpc"sv);
+ CbPackage Package;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("Method"sv, "GetCacheRecords"sv);
+ RequestWriter.AddInteger("Accept"sv, kCbPkgMagic);
+ RequestWriter.BeginObject("Params"sv);
+ {
+ RequestWriter.AddString("DefaultPolicy"sv, WriteToString<128>(CachePolicy::SkipData | CachePolicy::QueryLocal));
+ RequestWriter.AddString("Namespace"sv, m_Namespace);
+ RequestWriter.BeginArray("Requests"sv);
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.BeginObject("Key"sv);
+ {
+ RequestWriter.AddString("Bucket"sv, m_Bucket);
+ RequestWriter.AddHash("Hash"sv, m_Key);
+ }
+ RequestWriter.EndObject(); // Key
+ }
+ RequestWriter.EndObject();
+ }
+ RequestWriter.EndArray(); // Requests
+ }
+ RequestWriter.EndObject(); // Params
+ Package.SetObject(RequestWriter.Save());
+ }
+ HttpClient::Response Response = m_Client.Post(LoadRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+ LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed loading oplog container from {}/z$/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ return Result;
+ }
+
+ CbPackage ResponsePackage = Response.AsPackage();
+ if (!ResponsePackage)
+ {
+ Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv,
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+
+ CbObject ResponseObject = ResponsePackage.GetObject();
+ for (CbFieldView Reponse : ResponseObject["Result"])
+ {
+ CbObjectView ResultView = Reponse.AsObjectView();
+ MemoryView ResultBuffer = ResultView.GetView();
+ if (ValidateCompactBinary(ResultBuffer, CbValidateMode::Default, CbFieldType::Object) == CbValidateError::None)
+ {
+ Result.ContainerObject =
+ CbObject(SharedBuffer(IoBuffer(IoBuffer::Wrap, ResultBuffer.GetData(), ResultBuffer.GetSize())), CbFieldType::Object);
+ Result.ContainerObject.MakeOwned();
+ }
+ }
+ return Result;
+ }
+
+ virtual LoadContainerResult LoadBaseContainer() override
+ {
+ return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
+ }
+
+ virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override
+ {
+ std::string SaveRequest = fmt::format("/z$/$rpc");
+
+ CbPackage Package;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("Method"sv, "GetRecordAttachments"sv);
+ RequestWriter.AddInteger("Accept"sv, kCbPkgMagic);
+ RequestWriter.BeginObject("Params"sv);
+ {
+ RequestWriter.AddString("DefaultPolicy"sv, WriteToString<128>(CachePolicy::SkipData | CachePolicy::QueryLocal));
+ RequestWriter.AddString("Namespace"sv, m_Namespace);
+ RequestWriter.BeginArray("Requests"sv);
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.BeginObject("Key"sv);
+ {
+ RequestWriter.AddString("Bucket"sv, m_Bucket);
+ RequestWriter.AddHash("Hash"sv, m_Key);
+ }
+ RequestWriter.EndObject(); // Key
+ RequestWriter.BeginArray("Chunks"sv);
+ {
+ for (const IoHash& RawHash : RawHashes)
+ {
+ RequestWriter.AddHash(RawHash);
+ }
+ }
+ RequestWriter.EndArray(); // Chunks
+ }
+ RequestWriter.EndObject();
+ }
+ RequestWriter.EndArray(); // Requests
+ }
+ RequestWriter.EndObject(); // Params
+ Package.SetObject(RequestWriter.Save());
+ }
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+ HasAttachmentsResult Result = HasAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed loading oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ return Result;
+ }
+
+ CbPackage ResponsePackage = Response.AsPackage();
+ if (!ResponsePackage)
+ {
+ Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv,
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+ CbObject ResponseObject = ResponsePackage.GetObject();
+ for (CbFieldView Reponse : ResponseObject["Result"sv])
+ {
+ CbObjectView ResultView = Reponse.AsObjectView();
+ CbArrayView ChunksArrayView = ResultView["Chunks"sv].AsArrayView();
+ size_t ChunkIndex = 0;
+ for (CbFieldView ChunkView : ChunksArrayView)
+ {
+ IoHash ChunkHash = ChunkView.AsHash();
+ if (ChunkHash == IoHash::Zero)
+ {
+ Result.Needs.insert(RawHashes[ChunkIndex]);
+ }
+ ChunkIndex++;
+ }
+ }
+ return Result;
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ std::string SaveRequest = fmt::format("/z$/$rpc");
+
+ CbPackage Package;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("Method"sv, "GetRecordAttachments"sv);
+ RequestWriter.AddInteger("Accept"sv, kCbPkgMagic);
+ RequestWriter.BeginObject("Params"sv);
+ {
+ RequestWriter.AddString("Namespace"sv, m_Namespace);
+ RequestWriter.BeginArray("Requests"sv);
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.BeginObject("Key"sv);
+ {
+ RequestWriter.AddString("Bucket"sv, m_Bucket);
+ RequestWriter.AddHash("Hash"sv, m_Key);
+ }
+ RequestWriter.EndObject(); // Key
+ RequestWriter.BeginArray("Chunks"sv);
+ {
+ RequestWriter.AddHash(RawHash);
+ }
+ RequestWriter.EndArray(); // Chunks
+ }
+ RequestWriter.EndObject();
+ }
+ RequestWriter.EndArray(); // Requests
+ }
+ RequestWriter.EndObject(); // Params
+ Package.SetObject(RequestWriter.Save());
+ }
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
+ LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed loading oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ Result.Reason);
+ return Result;
+ }
+
+ CbPackage ResponsePackage = Response.AsPackage();
+ if (!ResponsePackage)
+ {
+ Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv,
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ return Result;
+ }
+ CbObject ResponseObject = ResponsePackage.GetObject();
+ for (CbFieldView Reponse : ResponseObject["Result"sv])
+ {
+ CbObjectView ResultView = Reponse.AsObjectView();
+ CbArrayView ChunksArrayView = ResultView["Chunks"sv].AsArrayView();
+ for (CbFieldView ChunkView : ChunksArrayView)
+ {
+ IoHash ChunkHash = ChunkView.AsHash();
+ if (ChunkHash == IoHash::Zero)
+ {
+ Result.Reason = fmt::format("The oplog {}/z$/{}/{}/{} is missing attachment {}"sv,
+ m_ProjectStoreUrl,
+ m_Namespace,
+ m_Bucket,
+ m_Key,
+ RawHash);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::NotFound);
+ return Result;
+ }
+ const CbAttachment* Attachment = ResponsePackage.FindAttachment(ChunkHash);
+ if (Attachment && Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ Result.Bytes = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ }
+ }
+ }
+ return Result;
+ }
+
+private:
+ void AddStats(const HttpClient::Response& Result)
+ {
+ m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.UploadedBytes));
+ m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.DownloadedBytes));
+ m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
+ SetAtomicMax(m_PeakSentBytes, Result.UploadedBytes);
+ SetAtomicMax(m_PeakReceivedBytes, Result.DownloadedBytes);
+ if (Result.ElapsedSeconds > 0.0)
+ {
+ uint64_t BytesPerSec = static_cast<uint64_t>((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds);
+ SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
+ }
+
+ m_RequestCount.fetch_add(1);
+ }
+
+ static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
+ {
+ if (Response.Error)
+ {
+ return {.ErrorCode = Response.Error.value().ErrorCode,
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Reason = Response.ErrorMessage(""),
+ .Text = Response.ToText()};
+ }
+ if (!Response.IsSuccess())
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Text = Response.ToText()};
+ }
+ return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds};
+ }
+
+ const std::string m_HostAddress;
+ const std::string m_ProjectStoreUrl;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const IoHash m_Key;
+ const bool m_ForceBlocks;
+ const std::filesystem::path m_TempFilePath;
+
+ HttpClient m_Client;
+
+ std::atomic_uint64_t m_SentBytes;
+ std::atomic_uint64_t m_ReceivedBytes;
+ std::atomic_uint64_t m_RequestTimeNS;
+ std::atomic_uint64_t m_RequestCount;
+ std::atomic_uint64_t m_PeakSentBytes;
+ std::atomic_uint64_t m_PeakReceivedBytes;
+ std::atomic_uint64_t m_PeakBytesPerSec;
+};
+
+std::shared_ptr<RemoteProjectStore>
+CreateZenCacheRemoteStore(const ZenCacheRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume http URL
+ Url = fmt::format("http://{}"sv, Url);
+ }
+ std::shared_ptr<RemoteProjectStore> RemoteStore =
+ std::make_shared<ZenCacheRemoteStore>(Url, Options.Namespace, Options.Bucket, Options.Key, Options.ForceBlocks, TempFilePath);
+ return RemoteStore;
+}
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/zencacheremoteprojectstore.h b/src/zenserver/projectstore/zencacheremoteprojectstore.h
new file mode 100644
index 000000000..ce12f3461
--- /dev/null
+++ b/src/zenserver/projectstore/zencacheremoteprojectstore.h
@@ -0,0 +1,21 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "remoteprojectstore.h"
+
+namespace zen {
+
+struct ZenCacheRemoteStoreOptions : RemoteStoreOptions
+{
+ std::string Url;
+ std::string Namespace;
+ std::string Bucket;
+ IoHash Key;
+ bool ForceBlocks = false;
+};
+
+std::shared_ptr<RemoteProjectStore> CreateZenCacheRemoteStore(const ZenCacheRemoteStoreOptions& Options,
+ const std::filesystem::path& TempFilePath);
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index cec68111f..bd7ce921b 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -51,10 +51,10 @@ public:
.m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
}
- virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ virtual SaveResult SaveContainer(const CbObject& Container) override
{
std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog);
- HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Container);
AddStats(Response);
SaveResult Result = SaveResult{ConvertResult(Response)};
@@ -84,7 +84,7 @@ public:
Result.Needs.insert(ChunkHash);
}
- Result.RawHash = IoHash::HashBuffer(Payload);
+ Result.RawHash = IoHash::HashBuffer(Container.GetBuffer().AsIoBuffer());
return Result;
}
@@ -227,11 +227,40 @@ public:
return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
}
- virtual HasAttachmentsResult HasAttachments(const std::span<IoHash>) override
+ virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override
{
- // For zen as remote store we never store blocks so we should never get here
- ZEN_ASSERT(false);
- return HasAttachmentsResult{};
+ std::string PrepRequest = fmt::format("/{}/oplog/{}/prep"sv, m_Project, m_Oplog);
+ CbObject Request;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.BeginArray("have"sv);
+ for (const IoHash& RawHash : RawHashes)
+ {
+ RequestWriter.AddHash(RawHash);
+ }
+ RequestWriter.EndArray(); // "have"
+ Request = RequestWriter.Save();
+ }
+
+ HttpClient::Response Response = m_Client.Post(PrepRequest, Request, HttpClient::Accept(ZenContentType::kCbObject));
+ HasAttachmentsResult Result = {ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed checking chunk existance from {}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ }
+ else
+ {
+ CbArrayView NeedsArray = Response.AsObject()["need"].AsArrayView();
+ for (CbFieldView RawHashField : NeedsArray)
+ {
+ Result.Needs.insert(RawHashField.AsHash());
+ }
+ }
+ return Result;
}
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override