aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-10-13 21:26:57 +0200
committerStefan Boberg <[email protected]>2021-10-13 21:26:57 +0200
commit9885b4697a11f1b905c4bda2003cacdbaa0281df (patch)
tree0703967c414f0267469b1cb376dbdcda4d60a764
parentprojectstore: Removing support for requesting attachments by hash using offse... (diff)
parentAdd remote execute functions (#15) (diff)
downloadzen-9885b4697a11f1b905c4bda2003cacdbaa0281df.tar.xz
zen-9885b4697a11f1b905c4bda2003cacdbaa0281df.zip
Merge branch 'main' of https://github.com/EpicGames/zen
-rw-r--r--zenserver/upstream/jupiter.cpp79
-rw-r--r--zenserver/upstream/jupiter.h5
2 files changed, 78 insertions, 6 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 0397ddaa0..1c43f1bc6 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -552,7 +552,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
if (Response.error)
@@ -583,7 +583,7 @@ CloudCacheSession::BlobExists(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
if (Response.error)
@@ -614,7 +614,7 @@ CloudCacheSession::CompressedBlobExists(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
if (Response.error)
@@ -645,7 +645,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
if (Response.error)
@@ -660,6 +660,77 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
+CloudCacheResult
+CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
+
+ auto& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
+CloudCacheResult
+CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
+
+ auto& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
std::vector<IoHash>
CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
{
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 1de417008..9471ef64f 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -94,13 +94,14 @@ public:
FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
- CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key);
- CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key);
CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
CloudCacheResult BlobExists(const IoHash& Key);
CloudCacheResult CompressedBlobExists(const IoHash& Key);
CloudCacheResult ObjectExists(const IoHash& Key);
+ CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData);
+ CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0);
+
std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
private: