diff options
| author | Stefan Boberg <[email protected]> | 2021-10-13 21:26:57 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-10-13 21:26:57 +0200 |
| commit | 9885b4697a11f1b905c4bda2003cacdbaa0281df (patch) | |
| tree | 0703967c414f0267469b1cb376dbdcda4d60a764 | |
| parent | projectstore: Removing support for requesting attachments by hash using offse... (diff) | |
| parent | Add remote execute functions (#15) (diff) | |
| download | zen-9885b4697a11f1b905c4bda2003cacdbaa0281df.tar.xz zen-9885b4697a11f1b905c4bda2003cacdbaa0281df.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 79 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 5 |
2 files changed, 78 insertions, 6 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 0397ddaa0..1c43f1bc6 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -552,7 +552,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -583,7 +583,7 @@ CloudCacheSession::BlobExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -614,7 +614,7 @@ CloudCacheSession::CompressedBlobExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -645,7 +645,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -660,6 +660,77 @@ CloudCacheSession::ObjectExists(const IoHash& Key) return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } +CloudCacheResult +CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; + + auto& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +CloudCacheResult +CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; + + auto& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + std::vector<IoHash> CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 1de417008..9471ef64f 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -94,13 +94,14 @@ public: FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); - CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key); - CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult BlobExists(const IoHash& Key); CloudCacheResult CompressedBlobExists(const IoHash& Key); CloudCacheResult ObjectExists(const IoHash& Key); + CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData); + CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); + std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); private: |