// Copyright Epic Games, Inc. All Rights Reserved. #include "jupiter.h" #include "diag/formatters.h" #include "diag/logging.h" #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "Crypt32.lib") # pragma comment(lib, "Wldap32.lib") #endif #include using namespace std::literals; namespace zen { namespace detail { struct CloudCacheSessionState { CloudCacheSessionState(CloudCacheClient& Client) : OwnerClient(Client) {} ~CloudCacheSessionState() {} const CloudCacheAccessToken& GetAccessToken() { if (!AccessToken.IsValid()) { AccessToken = OwnerClient.AcquireAccessToken(); } return AccessToken; } void InvalidateAccessToken() { AccessToken = {}; } void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) { Session.SetBody({}); Session.SetHeader({}); Session.SetConnectTimeout(ConnectTimeout); Session.SetTimeout(Timeout); AccessToken = GetAccessToken(); } cpr::Session& GetSession() { return Session; } private: friend class zen::CloudCacheClient; CloudCacheClient& OwnerClient; CloudCacheAccessToken AccessToken; cpr::Session Session; }; } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) { m_SessionState = m_CacheClient->AllocSessionState(); } CloudCacheSession::~CloudCacheSession() { m_CacheClient->FreeSessionState(m_SessionState); } CloudCacheResult CloudCacheSession::Authenticate() { const CloudCacheAccessToken& AccessToken = GetAccessToken(); return {.Success = AccessToken.IsValid()}; } CloudCacheResult CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { ZEN_TRACE_CPU("HordeClient::GetDerivedData"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) { return GetDerivedData(BucketId, Key.ToHexString()); } CloudCacheResult CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult CloudCacheSession::GetBlob(const IoHash& Key) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult CloudCacheSession::GetCompressedBlob(const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult CloudCacheSession::GetObject(const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObject"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { ZEN_TRACE_CPU("HordeClient::PutDerivedData"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size()); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; auto& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) { return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); } PutRefResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { ZEN_TRACE_CPU("HordeClient::PutRef"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { PutRefResult Result; Result.ErrorCode = 401; Result.Reason = "Invalid access token"sv; return Result; } IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption( cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { PutRefResult Result; Result.ErrorCode = static_cast(Response.error.code); Result.Reason = std::move(Response.error.message); return Result; } else if (!VerifyAccessToken(Response.status_code)) { PutRefResult Result; Result.ErrorCode = 401; Result.Reason = "Invalid access token"sv; return Result; } PutRefResult Result; Result.Success = (Response.status_code == 200 || Response.status_code == 201); Result.Bytes = Response.uploaded_bytes; Result.ElapsedSeconds = Response.elapsed; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(Response.text, JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } } return Result; } FinalizeRefResult CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) { ZEN_TRACE_CPU("HordeClient::FinalizeRef"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { FinalizeRefResult Result; Result.ErrorCode = 401; Result.Reason = "Invalid access token"sv; return Result; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); Session.SetBody(cpr::Body{}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); if (Response.error) { FinalizeRefResult Result; Result.ErrorCode = static_cast(Response.error.code); Result.Reason = std::move(Response.error.message); return Result; } else if (!VerifyAccessToken(Response.status_code)) { FinalizeRefResult Result; Result.ErrorCode = 401; Result.Reason = "Invalid access token"sv; return Result; } FinalizeRefResult Result; Result.Success = (Response.status_code == 200 || Response.status_code == 201); Result.Bytes = Response.uploaded_bytes; Result.ElapsedSeconds = Response.elapsed; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(Response.text, JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } } return Result; } CloudCacheResult CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("HordeClient::PutBlob"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}}); Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("HordeClient::PutObject"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::RefExists"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } CloudCacheResult CloudCacheSession::BlobExists(const IoHash& Key) { return CacheTypeExists("blobs"sv, Key); } CloudCacheResult CloudCacheSession::CompressedBlobExists(const IoHash& Key) { return CacheTypeExists("compressed-blobs"sv, Key); } CloudCacheResult CloudCacheSession::ObjectExists(const IoHash& Key) { return CacheTypeExists("objects"sv, Key); } CloudCacheExistsResult CloudCacheSession::BlobExists(const std::set& Keys) { return CacheTypeExists("blobs"sv, Keys); } CloudCacheExistsResult CloudCacheSession::CompressedBlobExists(const std::set& Keys) { return CacheTypeExists("compressed-blobs"sv, Keys); } CloudCacheExistsResult CloudCacheSession::ObjectExists(const std::set& Keys) { return CacheTypeExists("objects"sv, Keys); } CloudCacheResult CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) { ZEN_TRACE_CPU("HordeClient::PostComputeTasks"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; auto& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } CloudCacheResult CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; auto& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult CloudCacheSession::GetObjectTree(const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObjectTree"); const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() << "/tree"; cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } std::vector CloudCacheSession::Filter(std::string_view BucketId, const std::vector& ChunkHashes) { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace(); ZEN_UNUSED(BucketId, ChunkHashes); return {}; } const CloudCacheAccessToken& CloudCacheSession::GetAccessToken() { return m_SessionState->GetAccessToken(); } bool CloudCacheSession::VerifyAccessToken(long StatusCode) { if (StatusCode == 401) { m_SessionState->InvalidateAccessToken(); return false; } return true; } CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) { return {.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}; } else if (!VerifyAccessToken(Response.status_code)) { return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } CloudCacheExistsResult CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set& Keys) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; } ExtendableStringBuilder<256> Query; for (const auto& Key : Keys) { Query << (Query.Size() != 0 ? "&id=" : "id=") << Key.ToHexString(); } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exists?" << Query; cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); if (Response.error) { return {CloudCacheResult{.ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message}}; } else if (!VerifyAccessToken(Response.status_code)) { return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; } CloudCacheExistsResult Result{ CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; if (Result.Success) { IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); for (auto& Item : ExistsResponse["id"sv]) { if (Item.IsHash()) { Result.Have.insert(Item.AsHash()); } } } return Result; } ////////////////////////////////////////////////////////////////////////// // // ServiceUrl: https://jupiter.devtools.epicgames.com // DdcNamespace: ue4.ddc // OAuthClientId: 0oao91lrhqPiAlaGD0x7 // OAuthProvider: https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token // OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d // CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options) : m_Log(zen::logging::Get("jupiter")) , m_ServiceUrl(Options.ServiceUrl) , m_OAuthFullUri(Options.OAuthProvider) , m_DdcNamespace(Options.DdcNamespace) , m_BlobStoreNamespace(Options.BlobStoreNamespace) , m_OAuthClientId(Options.OAuthClientId) , m_OAuthSecret(Options.OAuthSecret) , m_AccessToken(Options.AccessToken) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { if (!Options.AccessToken.empty()) { // If an access token was provided, OAuth settings are not used. return; } if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv)) { ZEN_WARN("bad provider specification: '{}' - must be fully qualified", Options.OAuthProvider); m_IsValid = false; return; } // Split into host and Uri substrings auto SchemePos = Options.OAuthProvider.find("://"sv); if (SchemePos == std::string::npos) { ZEN_WARN("Bad service URL passed to cloud cache client: '{}'", Options.ServiceUrl); m_IsValid = false; return; } auto DomainEnd = Options.OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3); if (DomainEnd == std::string::npos) { ZEN_WARN("Bad service URL passed to cloud cache client: '{}' no path delimiter found", Options.ServiceUrl); m_IsValid = false; return; } m_OAuthDomain = Options.OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com m_OAuthUriPath = Options.OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token } CloudCacheClient::~CloudCacheClient() { RwLock::ExclusiveLockScope _(m_SessionStateLock); for (auto State : m_SessionStateCache) { delete State; } } CloudCacheAccessToken CloudCacheClient::AcquireAccessToken() { ZEN_TRACE_CPU("HordeClient::AcquireAccessToken"); using namespace std::chrono; // If an access token was provided, return it instead of querying OAuth if (!m_AccessToken.empty()) { return {m_AccessToken, steady_clock::time_point::max()}; } ExtendableStringBuilder<128> OAuthFormData; OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; std::string Body{OAuthFormData}; cpr::Response Response = cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{Body}); Body = std::move(Response.text); std::string JsonError; json11::Json JsonResponse = json11::Json::parse(Body, JsonError); if (!JsonError.empty()) { return {}; } std::string AccessToken = std::string("Bearer ") + JsonResponse["access_token"].string_value(); int64_t ExpiresInSeconds = static_cast(JsonResponse["expires_in"].int_value()); steady_clock::time_point ExpireTime = steady_clock::now() + seconds(ExpiresInSeconds); return {std::move(AccessToken), ExpireTime}; } detail::CloudCacheSessionState* CloudCacheClient::AllocSessionState() { detail::CloudCacheSessionState* State = nullptr; if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty()) { State = m_SessionStateCache.front(); m_SessionStateCache.pop_front(); } if (State == nullptr) { State = new detail::CloudCacheSessionState(*this); } State->Reset(m_ConnectTimeout, m_Timeout); return State; } void CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) { const bool IsTokenValid = State->AccessToken.IsValid(); RwLock::ExclusiveLockScope _(m_SessionStateLock); m_SessionStateCache.push_front(State); // Invalidate all cached access tokens if any one fails if (!IsTokenValid) { for (auto& CachedState : m_SessionStateCache) { CachedState->AccessToken = {}; } } } } // namespace zen