diff options
Diffstat (limited to 'zenserver/upstream/jupiter.cpp')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 79 |
1 files changed, 75 insertions, 4 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 0397ddaa0..1c43f1bc6 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -552,7 +552,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -583,7 +583,7 @@ CloudCacheSession::BlobExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -614,7 +614,7 @@ CloudCacheSession::CompressedBlobExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -645,7 +645,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -660,6 +660,77 @@ CloudCacheSession::ObjectExists(const IoHash& Key) return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } +CloudCacheResult +CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; + + auto& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +CloudCacheResult +CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; + + auto& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + std::vector<IoHash> CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { |