diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /zenserver/upstream/jupiter.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'zenserver/upstream/jupiter.cpp')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 965 |
1 files changed, 0 insertions, 965 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp deleted file mode 100644 index dbb185bec..000000000 --- a/zenserver/upstream/jupiter.cpp +++ /dev/null @@ -1,965 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "jupiter.h" - -#include "diag/formatters.h" -#include "diag/logging.h" - -#include <zencore/compactbinary.h> -#include <zencore/compositebuffer.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/string.h> -#include <zencore/thread.h> -#include <zencore/trace.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -#include <fmt/format.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "Crypt32.lib") -# pragma comment(lib, "Wldap32.lib") -#endif - -#include <json11.hpp> - -using namespace std::literals; - -namespace zen { - -namespace detail { - struct CloudCacheSessionState - { - CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {} - - const CloudCacheAccessToken& GetAccessToken(bool RefreshToken) - { - if (RefreshToken) - { - m_AccessToken = m_Client.AcquireAccessToken(); - } - - return m_AccessToken; - } - - cpr::Session& GetSession() { return m_Session; } - - void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) - { - m_Session.SetBody({}); - m_Session.SetHeader({}); - m_Session.SetConnectTimeout(ConnectTimeout); - m_Session.SetTimeout(Timeout); - } - - private: - friend class zen::CloudCacheClient; - - CloudCacheClient& m_Client; - CloudCacheAccessToken m_AccessToken; - cpr::Session m_Session; - }; - -} // namespace detail - -CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) -{ - m_SessionState = m_CacheClient->AllocSessionState(); -} - -CloudCacheSession::~CloudCacheSession() -{ - m_CacheClient->FreeSessionState(m_SessionState); -} - -CloudCacheResult -CloudCacheSession::Authenticate() -{ - const bool RefreshToken = true; - const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken); - - return {.Success = AccessToken.IsValid()}; -} - -CloudCacheResult -CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) -{ - const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; -} - -CloudCacheResult -CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) -{ - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = - Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; -} - -CloudCacheResult -CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; -} - -CloudCacheResult -CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash) -{ - ZEN_TRACE_CPU("HordeClient::GetInlineBlob"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end()) - { - const std::string& PayloadHashHeader = It->second; - if (PayloadHashHeader.length() == IoHash::StringLength) - { - OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); - } - } - - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; -} - -CloudCacheResult -CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("HordeClient::GetObject"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; -} - -PutRefResult -CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) -{ - ZEN_TRACE_CPU("HordeClient::PutRef"); - - IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - - const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); - Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - if (Response.error) - { - PutRefResult Result; - Result.ErrorCode = static_cast<int32_t>(Response.error.code); - Result.Reason = std::move(Response.error.message); - return Result; - } - else if (!VerifyAccessToken(Response.status_code)) - { - PutRefResult Result; - Result.ErrorCode = 401; - Result.Reason = "Invalid access token"sv; - return Result; - } - - PutRefResult Result; - Result.Success = (Response.status_code == 200 || Response.status_code == 201); - Result.Bytes = Response.uploaded_bytes; - Result.ElapsedSeconds = Response.elapsed; - - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - - return Result; -} - -FinalizeRefResult -CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) -{ - ZEN_TRACE_CPU("HordeClient::FinalizeRef"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" - << RefHash.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, - {"X-Jupiter-IoHash", RefHash.ToHexString()}, - {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{}); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); - - if (Response.error) - { - FinalizeRefResult Result; - Result.ErrorCode = static_cast<int32_t>(Response.error.code); - Result.Reason = std::move(Response.error.message); - return Result; - } - else if (!VerifyAccessToken(Response.status_code)) - { - FinalizeRefResult Result; - Result.ErrorCode = 401; - Result.Reason = "Invalid access token"sv; - return Result; - } - - FinalizeRefResult Result; - Result.Success = (Response.status_code == 200 || Response.status_code == 201); - Result.Bytes = Response.uploaded_bytes; - Result.ElapsedSeconds = Response.elapsed; - - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - - return Result; -} - -CloudCacheResult -CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("HordeClient::PutBlob"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}}); - Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; -} - -CloudCacheResult -CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); - Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; -} - -CloudCacheResult -CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) -{ - ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; -} - -CloudCacheResult -CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) -{ - ZEN_TRACE_CPU("HordeClient::PutObject"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; -} - -CloudCacheResult -CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) -{ - ZEN_TRACE_CPU("HordeClient::RefExists"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; -} - -GetObjectReferencesResult -CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("HordeClient::GetObjectReferences"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - if (Response.error) - { - return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; - } - - GetObjectReferencesResult Result{ - CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; - - if (Result.Success) - { - IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer); - for (auto& Item : ReferencesResponse["references"sv]) - { - Result.References.insert(Item.AsHash()); - } - } - - return Result; -} - -CloudCacheResult -CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "blobs"sv, Key); -} - -CloudCacheResult -CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); -} - -CloudCacheResult -CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "objects"sv, Key); -} - -CloudCacheExistsResult -CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "blobs"sv, Keys); -} - -CloudCacheExistsResult -CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); -} - -CloudCacheExistsResult -CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "objects"sv, Keys); -} - -CloudCacheResult -CloudCacheSession::PostComputeTasks(IoBuffer TasksData) -{ - ZEN_TRACE_CPU("HordeClient::PostComputeTasks"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; -} - -CloudCacheResult -CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) -{ - ZEN_TRACE_CPU("HordeClient::GetComputeUpdates"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster() << "/updates/" << ChannelId - << "?wait=" << WaitSeconds; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; -} - -std::vector<IoHash> -CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) -{ - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << Namespace; - - ZEN_UNUSED(BucketId, ChunkHashes); - - return {}; -} - -cpr::Session& -CloudCacheSession::GetSession() -{ - return m_SessionState->GetSession(); -} - -CloudCacheAccessToken -CloudCacheSession::GetAccessToken(bool RefreshToken) -{ - return m_SessionState->GetAccessToken(RefreshToken); -} - -bool -CloudCacheSession::VerifyAccessToken(long StatusCode) -{ - return StatusCode != 401; -} - -CloudCacheResult -CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) -{ - ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; -} - -CloudCacheExistsResult -CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) -{ - ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); - - ExtendableStringBuilder<256> Body; - Body << "["; - for (const auto& Key : Keys) - { - Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; - } - Body << "]"; - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}}); - Session.SetOption(cpr::Body(Body.ToString())); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); - - if (Response.error) - { - return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; - } - - CloudCacheExistsResult Result{ - CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; - - if (Result.Success) - { - IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); - for (auto& Item : ExistsResponse["needs"sv]) - { - Result.Needs.insert(Item.AsHash()); - } - } - - return Result; -} - -/** - * An access token provider that holds a token that will never change. - */ -class StaticTokenProvider final : public CloudCacheTokenProvider -{ -public: - StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {} - - virtual ~StaticTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; } - -private: - CloudCacheAccessToken m_Token; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token) -{ - return std::make_unique<StaticTokenProvider>(std::move(Token)); -} - -class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider -{ -public: - OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params) - { - m_Url = std::string(Params.Url); - m_ClientId = std::string(Params.ClientId); - m_ClientSecret = std::string(Params.ClientSecret); - } - - virtual ~OAuthClientCredentialsTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override - { - using namespace std::chrono; - - std::string Body = - fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret); - - cpr::Response Response = - cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)}); - - if (Response.error || Response.status_code != 200) - { - return {}; - } - - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); - - if (JsonError.empty() == false) - { - return {}; - } - - std::string Token = Json["access_token"].string_value(); - int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value()); - CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds); - - return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime}; - } - -private: - std::string m_Url; - std::string m_ClientId; - std::string m_ClientSecret; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params) -{ - return std::make_unique<OAuthClientCredentialsTokenProvider>(Params); -} - -class CallbackTokenProvider final : public CloudCacheTokenProvider -{ -public: - CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {} - - virtual ~CallbackTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); } - -private: - std::function<CloudCacheAccessToken()> m_Callback; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback) -{ - return std::make_unique<CallbackTokenProvider>(std::move(Callback)); -} - -CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) -: m_Log(zen::logging::Get("jupiter")) -, m_ServiceUrl(Options.ServiceUrl) -, m_DefaultDdcNamespace(Options.DdcNamespace) -, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) -, m_ComputeCluster(Options.ComputeCluster) -, m_ConnectTimeout(Options.ConnectTimeout) -, m_Timeout(Options.Timeout) -, m_TokenProvider(std::move(TokenProvider)) -{ - ZEN_ASSERT(m_TokenProvider.get() != nullptr); -} - -CloudCacheClient::~CloudCacheClient() -{ - RwLock::ExclusiveLockScope _(m_SessionStateLock); - - for (auto State : m_SessionStateCache) - { - delete State; - } -} - -CloudCacheAccessToken -CloudCacheClient::AcquireAccessToken() -{ - ZEN_TRACE_CPU("HordeClient::AcquireAccessToken"); - - return m_TokenProvider->AcquireAccessToken(); -} - -detail::CloudCacheSessionState* -CloudCacheClient::AllocSessionState() -{ - detail::CloudCacheSessionState* State = nullptr; - - bool IsTokenValid = false; - - { - RwLock::ExclusiveLockScope _(m_SessionStateLock); - - if (m_SessionStateCache.empty() == false) - { - State = m_SessionStateCache.front(); - IsTokenValid = State->m_AccessToken.IsValid(); - - m_SessionStateCache.pop_front(); - } - } - - if (State == nullptr) - { - State = new detail::CloudCacheSessionState(*this); - } - - State->Reset(m_ConnectTimeout, m_Timeout); - - if (IsTokenValid == false) - { - State->m_AccessToken = m_TokenProvider->AcquireAccessToken(); - } - - return State; -} - -void -CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) -{ - RwLock::ExclusiveLockScope _(m_SessionStateLock); - m_SessionStateCache.push_front(State); -} - -} // namespace zen |