// Copyright Epic Games, Inc. All Rights Reserved. #include "jupiter.h" #include "diag/logging.h" #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "Crypt32.lib") # pragma comment(lib, "Wldap32.lib") #endif #include using namespace std::literals; namespace zen { namespace detail { struct CloudCacheSessionState { CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {} const CloudCacheAccessToken& GetAccessToken(bool RefreshToken) { if (RefreshToken) { m_AccessToken = m_Client.AcquireAccessToken(); } return m_AccessToken; } cpr::Session& GetSession() { return m_Session; } void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout, bool AssumeHttp2) { m_Session.SetBody({}); m_Session.SetHeader({}); m_Session.SetConnectTimeout(ConnectTimeout); m_Session.SetTimeout(Timeout); if (AssumeHttp2) { m_Session.SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE}); } } private: friend class zen::CloudCacheClient; CloudCacheClient& m_Client; CloudCacheAccessToken m_AccessToken; cpr::Session m_Session; }; CloudCacheResult ConvertResponse(const cpr::Response& Response) { if (Response.error) { return {.ElapsedSeconds = Response.elapsed, .ErrorCode = static_cast(Response.error.code), .Reason = Response.error.message, .Success = false}; } if (!IsHttpSuccessCode(Response.status_code)) { return {.ElapsedSeconds = Response.elapsed, .ErrorCode = static_cast(Response.status_code), .Reason = Response.reason.empty() ? Response.text : Response.reason, .Success = false}; } return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .ErrorCode = 0, .Reason = Response.reason, .Success = true}; } cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer) { if (TempFolderPath.empty()) { return Session.Get(); } std::string PayloadString; std::shared_ptr PayloadFile; auto _ = MakeGuard([&]() { if (PayloadFile) { PayloadFile.reset(); std::filesystem::path TempPath = TempFolderPath / Name; std::error_code Ec; std::filesystem::remove(TempPath, Ec); } }); uint64_t Offset = 0; Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) { if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) { std::filesystem::path TempPath = TempFolderPath / Name; PayloadFile = std::make_shared(); PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete); PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset); Offset += PayloadString.size(); PayloadString.clear(); } if (PayloadFile) { PayloadFile->Write(data.data(), data.size(), Offset); Offset += data.size(); } else { PayloadString.append(data); } return true; }}); cpr::Response Response = Session.Get(); if (!Response.error && IsHttpSuccessCode(Response.status_code)) { if (PayloadFile) { uint64_t PayloadSize = PayloadFile->FileSize(); void* FileHandle = PayloadFile->Detach(); PayloadFile.reset(); OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true); OutBuffer.SetDeleteOnClose(true); } else { OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size()); } return Response; } Response.text.swap(PayloadString); return Response; } } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) { m_SessionState = m_CacheClient->AllocSessionState(); } CloudCacheSession::~CloudCacheSession() { m_CacheClient->FreeSessionState(m_SessionState); } CloudCacheResult CloudCacheSession::Authenticate() { const bool RefreshToken = true; const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken); return {.Success = AccessToken.IsValid()}; } CloudCacheResult CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); } else { ZEN_WARN( "CloudCacheSession::GetRef failed GET. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-Accept: '{}', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), ContentType, Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); } else { ZEN_WARN( "CloudCacheSession::GetBlob failed GET. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-Accept: '{}', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/octet-stream", Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); ExtendableStringBuilder<256> Uri; std::string KeyString = Key.ToHexString(); Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); Session.SetOption(cpr::Body{}); IoBuffer Payload; cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { Result.Response = std::move(Payload); } else { ZEN_WARN( "CloudCacheSession::GetCompressedBlob failed GET. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-Accept: '{}', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-ue-comp", Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash, std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); ExtendableStringBuilder<256> Uri; std::string KeyString = Key.ToHexString(); Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); Session.SetOption(cpr::Body{}); IoBuffer Payload; cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { Result.Response = std::move(Payload); } else { ZEN_WARN( "CloudCacheSession::GetInlineBlob failed GET. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-Accept: '{}', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-jupiter-inline", Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end()) { const std::string& PayloadHashHeader = It->second; if (PayloadHashHeader.length() == IoHash::StringLength) { OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); } } return Result; } CloudCacheResult CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObject"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); } else { ZEN_WARN( "CloudCacheSession::GetObject failed GET. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-Accept: '{}', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-ue-cb", Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } PutRefResult CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { ZEN_TRACE_CPU("JupiterClient::PutRef"); IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption( cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); PutRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(Response.text, JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } Result.RawHash = Hash; } else { ZEN_WARN( "CloudCacheSession::PutRef failed PUT. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-X-Jupiter-IoHash: '{}', " "Header-ContentType: '{}', " "ContentSize: {}, " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), Hash.ToHexString(), ContentType, NiceBytes(Ref.Size()), Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } FinalizeRefResult CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) { ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); Session.SetBody(cpr::Body{}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); FinalizeRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(Response.text, JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } } else { ZEN_WARN( "CloudCacheSession::FinalizeRef failed PUT. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-X-Jupiter-IoHash: '{}', " "Header-ContentType: '{}', " "ContentSize: {}, " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), RefHash.ToHexString(), "application/x-ue-cb", NiceBytes(0), Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("JupiterClient::PutBlob"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}}); Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (!Result.Success) { ZEN_WARN( "CloudCacheSession::PutBlob failed PUT. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-ContentType: '{}', " "ContentSize: {}, " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/octet-stream", NiceBytes(Blob.Size()), Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); uint64_t Offset = 0; if (Blob.IsWholeFile()) { auto ReadCallback = [&Blob, &Offset](char* buffer, size_t& size, intptr_t) { size = Min(size, Blob.GetSize() - Offset); IoBuffer PayloadRange = IoBuffer(Blob, Offset, size); MutableMemoryView Data(buffer, size); Data.CopyFrom(PayloadRange.GetView()); Offset += size; return true; }; Session.SetReadCallback(cpr::ReadCallback(gsl::narrow(Blob.GetSize()), ReadCallback)); } else { Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); } cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (!Result.Success) { ZEN_WARN( "CloudCacheSession::PutCompressedBlob failed PUT. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-ContentType: '{}', " "ContentSize: {}, " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-ue-comp", NiceBytes(Blob.Size()), Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); uint64_t SizeLeft = Payload.GetSize(); CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { size = Min(size, SizeLeft); MutableMemoryView Data(buffer, size); Payload.CopyTo(Data, BufferIt); SizeLeft -= size; return true; }; Session.SetReadCallback(cpr::ReadCallback(gsl::narrow(SizeLeft), ReadCallback)); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (!Result.Success) { ZEN_WARN( "CloudCacheSession::PutCompressedBlob failed PUT. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-ContentType: '{}', " "ContentSize: {}, " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-ue-comp", NiceBytes(Payload.GetSize()), Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("JupiterClient::PutObject"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (!Result.Success) { ZEN_WARN( "CloudCacheSession::PutObject failed PUT. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-ContentType: '{}', " "ContentSize: {}, " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-ue-cb", NiceBytes(Object.GetSize()), Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::RefExists"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (!Result.Success) { ZEN_WARN( "CloudCacheSession::RefExists failed PUT. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } GetObjectReferencesResult CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer); for (auto& Item : ReferencesResponse["references"sv]) { Result.References.insert(Item.AsHash()); } } else { ZEN_WARN( "CloudCacheSession::GetObjectReferences failed PUT. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-Accept: '{}', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-ue-cb", Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheResult CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "blobs"sv, Key); } CloudCacheResult CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); } CloudCacheResult CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "objects"sv, Key); } CloudCacheExistsResult CloudCacheSession::BlobExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "blobs"sv, Keys); } CloudCacheExistsResult CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); } CloudCacheExistsResult CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "objects"sv, Keys); } std::vector CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector& ChunkHashes) { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); Uri << "/api/v1/s/" << Namespace; ZEN_UNUSED(BucketId, ChunkHashes); return {}; } cpr::Session& CloudCacheSession::GetSession() { return m_SessionState->GetSession(); } CloudCacheAccessToken CloudCacheSession::GetAccessToken(bool RefreshToken) { return m_SessionState->GetAccessToken(RefreshToken); } CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (!Result.Success) { ZEN_WARN( "CloudCacheSession::CacheTypeExists failed GET. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-Accept: '{}', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-ue-cb", Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } CloudCacheExistsResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set& Keys) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); ExtendableStringBuilder<256> Body; Body << "["; for (const auto& Key : Keys) { Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; } Body << "]"; ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption( cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}}); Session.SetOption(cpr::Body(Body.ToString())); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); CloudCacheExistsResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); for (auto& Item : ExistsResponse["needs"sv]) { Result.Needs.insert(Item.AsHash()); } } else { ZEN_WARN( "CloudCacheSession::CacheTypeExists failed GET. " "Elapsed: {} s, " "Uri: '{}', " "Header-Authorization: '{} ', " "Header-Accept: '{}', " "Response.status_code: {}, " "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " "Response.raw_header: '{}'" "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), "application/x-ue-cb", Response.status_code, Response.reason, gsl::narrow(Response.error.code), Response.error.message, Response.raw_header, Response.text); } return Result; } /** * An access token provider that holds a token that will never change. */ class StaticTokenProvider final : public CloudCacheTokenProvider { public: StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {} virtual ~StaticTokenProvider() = default; virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; } private: CloudCacheAccessToken m_Token; }; std::unique_ptr CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token) { return std::make_unique(std::move(Token)); } class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider { public: OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params) { m_Url = std::string(Params.Url); m_ClientId = std::string(Params.ClientId); m_ClientSecret = std::string(Params.ClientSecret); } virtual ~OAuthClientCredentialsTokenProvider() = default; virtual CloudCacheAccessToken AcquireAccessToken() final override { using namespace std::chrono; std::string Body = fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret); cpr::Response Response = cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)}); if (Response.error || Response.status_code != 200) { return {}; } std::string JsonError; json11::Json Json = json11::Json::parse(Response.text, JsonError); if (JsonError.empty() == false) { return {}; } std::string Token = Json["access_token"].string_value(); int64_t ExpiresInSeconds = static_cast(Json["expires_in"].int_value()); CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds); return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime}; } private: std::string m_Url; std::string m_ClientId; std::string m_ClientSecret; }; std::unique_ptr CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params) { return std::make_unique(Params); } class CallbackTokenProvider final : public CloudCacheTokenProvider { public: CallbackTokenProvider(std::function&& Callback) : m_Callback(std::move(Callback)) {} virtual ~CallbackTokenProvider() = default; virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); } private: std::function m_Callback; }; std::unique_ptr CloudCacheTokenProvider::CreateFromCallback(std::function&& Callback) { return std::make_unique(std::move(Callback)); } CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr TokenProvider) : m_Log(zen::logging::Get("jupiter")) , m_ServiceUrl(Options.ServiceUrl) , m_DefaultDdcNamespace(Options.DdcNamespace) , m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) , m_ComputeCluster(Options.ComputeCluster) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) , m_TokenProvider(std::move(TokenProvider)) , m_AssumeHttp2(Options.AssumeHttp2) { ZEN_ASSERT(m_TokenProvider.get() != nullptr); } CloudCacheClient::~CloudCacheClient() { RwLock::ExclusiveLockScope _(m_SessionStateLock); for (auto State : m_SessionStateCache) { delete State; } } CloudCacheAccessToken CloudCacheClient::AcquireAccessToken() { ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken"); return m_TokenProvider->AcquireAccessToken(); } detail::CloudCacheSessionState* CloudCacheClient::AllocSessionState() { detail::CloudCacheSessionState* State = nullptr; bool IsTokenValid = false; { RwLock::ExclusiveLockScope _(m_SessionStateLock); if (m_SessionStateCache.empty() == false) { State = m_SessionStateCache.front(); IsTokenValid = State->m_AccessToken.IsValid(); m_SessionStateCache.pop_front(); } } if (State == nullptr) { State = new detail::CloudCacheSessionState(*this); } State->Reset(m_ConnectTimeout, m_Timeout, m_AssumeHttp2); if (IsTokenValid == false) { State->m_AccessToken = m_TokenProvider->AcquireAccessToken(); } return State; } void CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) { RwLock::ExclusiveLockScope _(m_SessionStateLock); m_SessionStateCache.push_front(State); } } // namespace zen