diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/jupiter.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/upstream/jupiter.cpp')
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 965 |
1 files changed, 965 insertions, 0 deletions
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp new file mode 100644 index 000000000..dbb185bec --- /dev/null +++ b/src/zenserver/upstream/jupiter.cpp @@ -0,0 +1,965 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "jupiter.h" + +#include "diag/formatters.h" +#include "diag/logging.h" + +#include <zencore/compactbinary.h> +#include <zencore/compositebuffer.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_PLATFORM_WINDOWS +# pragma comment(lib, "Crypt32.lib") +# pragma comment(lib, "Wldap32.lib") +#endif + +#include <json11.hpp> + +using namespace std::literals; + +namespace zen { + +namespace detail { + struct CloudCacheSessionState + { + CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {} + + const CloudCacheAccessToken& GetAccessToken(bool RefreshToken) + { + if (RefreshToken) + { + m_AccessToken = m_Client.AcquireAccessToken(); + } + + return m_AccessToken; + } + + cpr::Session& GetSession() { return m_Session; } + + void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) + { + m_Session.SetBody({}); + m_Session.SetHeader({}); + m_Session.SetConnectTimeout(ConnectTimeout); + m_Session.SetTimeout(Timeout); + } + + private: + friend class zen::CloudCacheClient; + + CloudCacheClient& m_Client; + CloudCacheAccessToken m_AccessToken; + cpr::Session m_Session; + }; + +} // namespace detail + +CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) +{ + m_SessionState = m_CacheClient->AllocSessionState(); +} + +CloudCacheSession::~CloudCacheSession() +{ + m_CacheClient->FreeSessionState(m_SessionState); +} + +CloudCacheResult +CloudCacheSession::Authenticate() +{ + const bool RefreshToken = true; + const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken); + + return {.Success = AccessToken.IsValid()}; +} + +CloudCacheResult +CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +{ + const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +CloudCacheResult +CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = + Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +CloudCacheResult +CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +CloudCacheResult +CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash) +{ + ZEN_TRACE_CPU("HordeClient::GetInlineBlob"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end()) + { + const std::string& PayloadHashHeader = It->second; + if (PayloadHashHeader.length() == IoHash::StringLength) + { + OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); + } + } + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +CloudCacheResult +CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("HordeClient::GetObject"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +PutRefResult +CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +{ + ZEN_TRACE_CPU("HordeClient::PutRef"); + + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); + + const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); + Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + PutRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; + } + else if (!VerifyAccessToken(Response.status_code)) + { + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + PutRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; +} + +FinalizeRefResult +CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + ZEN_TRACE_CPU("HordeClient::FinalizeRef"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" + << RefHash.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, + {"X-Jupiter-IoHash", RefHash.ToHexString()}, + {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + FinalizeRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; + } + else if (!VerifyAccessToken(Response.status_code)) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + FinalizeRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; +} + +CloudCacheResult +CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("HordeClient::PutBlob"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}}); + Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; +} + +CloudCacheResult +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); + Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; +} + +CloudCacheResult +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) +{ + ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; +} + +CloudCacheResult +CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) +{ + ZEN_TRACE_CPU("HordeClient::PutObject"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()}); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; +} + +CloudCacheResult +CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) +{ + ZEN_TRACE_CPU("HordeClient::RefExists"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Head(); + ZEN_DEBUG("HEAD {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +GetObjectReferencesResult +CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("HordeClient::GetObjectReferences"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + GetObjectReferencesResult Result{ + CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; + + if (Result.Success) + { + IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer); + for (auto& Item : ReferencesResponse["references"sv]) + { + Result.References.insert(Item.AsHash()); + } + } + + return Result; +} + +CloudCacheResult +CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "blobs"sv, Key); +} + +CloudCacheResult +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); +} + +CloudCacheResult +CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "objects"sv, Key); +} + +CloudCacheExistsResult +CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "blobs"sv, Keys); +} + +CloudCacheExistsResult +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); +} + +CloudCacheExistsResult +CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "objects"sv, Keys); +} + +CloudCacheResult +CloudCacheSession::PostComputeTasks(IoBuffer TasksData) +{ + ZEN_TRACE_CPU("HordeClient::PostComputeTasks"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +CloudCacheResult +CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) +{ + ZEN_TRACE_CPU("HordeClient::GetComputeUpdates"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster() << "/updates/" << ChannelId + << "?wait=" << WaitSeconds; + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +std::vector<IoHash> +CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl(); + Uri << "/api/v1/s/" << Namespace; + + ZEN_UNUSED(BucketId, ChunkHashes); + + return {}; +} + +cpr::Session& +CloudCacheSession::GetSession() +{ + return m_SessionState->GetSession(); +} + +CloudCacheAccessToken +CloudCacheSession::GetAccessToken(bool RefreshToken) +{ + return m_SessionState->GetAccessToken(RefreshToken); +} + +bool +CloudCacheSession::VerifyAccessToken(long StatusCode) +{ + return StatusCode != 401; +} + +CloudCacheResult +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) +{ + ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Head(); + ZEN_DEBUG("HEAD {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +CloudCacheExistsResult +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) +{ + ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); + + ExtendableStringBuilder<256> Body; + Body << "["; + for (const auto& Key : Keys) + { + Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; + } + Body << "]"; + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}}); + Session.SetOption(cpr::Body(Body.ToString())); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + CloudCacheExistsResult Result{ + CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; + + if (Result.Success) + { + IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); + for (auto& Item : ExistsResponse["needs"sv]) + { + Result.Needs.insert(Item.AsHash()); + } + } + + return Result; +} + +/** + * An access token provider that holds a token that will never change. + */ +class StaticTokenProvider final : public CloudCacheTokenProvider +{ +public: + StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {} + + virtual ~StaticTokenProvider() = default; + + virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; } + +private: + CloudCacheAccessToken m_Token; +}; + +std::unique_ptr<CloudCacheTokenProvider> +CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token) +{ + return std::make_unique<StaticTokenProvider>(std::move(Token)); +} + +class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider +{ +public: + OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params) + { + m_Url = std::string(Params.Url); + m_ClientId = std::string(Params.ClientId); + m_ClientSecret = std::string(Params.ClientSecret); + } + + virtual ~OAuthClientCredentialsTokenProvider() = default; + + virtual CloudCacheAccessToken AcquireAccessToken() final override + { + using namespace std::chrono; + + std::string Body = + fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret); + + cpr::Response Response = + cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)}); + + if (Response.error || Response.status_code != 200) + { + return {}; + } + + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + + if (JsonError.empty() == false) + { + return {}; + } + + std::string Token = Json["access_token"].string_value(); + int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value()); + CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds); + + return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime}; + } + +private: + std::string m_Url; + std::string m_ClientId; + std::string m_ClientSecret; +}; + +std::unique_ptr<CloudCacheTokenProvider> +CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params) +{ + return std::make_unique<OAuthClientCredentialsTokenProvider>(Params); +} + +class CallbackTokenProvider final : public CloudCacheTokenProvider +{ +public: + CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {} + + virtual ~CallbackTokenProvider() = default; + + virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); } + +private: + std::function<CloudCacheAccessToken()> m_Callback; +}; + +std::unique_ptr<CloudCacheTokenProvider> +CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback) +{ + return std::make_unique<CallbackTokenProvider>(std::move(Callback)); +} + +CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) +: m_Log(zen::logging::Get("jupiter")) +, m_ServiceUrl(Options.ServiceUrl) +, m_DefaultDdcNamespace(Options.DdcNamespace) +, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) +, m_ComputeCluster(Options.ComputeCluster) +, m_ConnectTimeout(Options.ConnectTimeout) +, m_Timeout(Options.Timeout) +, m_TokenProvider(std::move(TokenProvider)) +{ + ZEN_ASSERT(m_TokenProvider.get() != nullptr); +} + +CloudCacheClient::~CloudCacheClient() +{ + RwLock::ExclusiveLockScope _(m_SessionStateLock); + + for (auto State : m_SessionStateCache) + { + delete State; + } +} + +CloudCacheAccessToken +CloudCacheClient::AcquireAccessToken() +{ + ZEN_TRACE_CPU("HordeClient::AcquireAccessToken"); + + return m_TokenProvider->AcquireAccessToken(); +} + +detail::CloudCacheSessionState* +CloudCacheClient::AllocSessionState() +{ + detail::CloudCacheSessionState* State = nullptr; + + bool IsTokenValid = false; + + { + RwLock::ExclusiveLockScope _(m_SessionStateLock); + + if (m_SessionStateCache.empty() == false) + { + State = m_SessionStateCache.front(); + IsTokenValid = State->m_AccessToken.IsValid(); + + m_SessionStateCache.pop_front(); + } + } + + if (State == nullptr) + { + State = new detail::CloudCacheSessionState(*this); + } + + State->Reset(m_ConnectTimeout, m_Timeout); + + if (IsTokenValid == false) + { + State->m_AccessToken = m_TokenProvider->AcquireAccessToken(); + } + + return State; +} + +void +CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) +{ + RwLock::ExclusiveLockScope _(m_SessionStateLock); + m_SessionStateCache.push_front(State); +} + +} // namespace zen |