diff options
| author | Per Larsson <[email protected]> | 2021-09-23 13:56:48 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-09-23 13:56:48 +0200 |
| commit | 85152e23a1d64f2caabf8467572e052f296133f3 (patch) | |
| tree | 1c3e52d17985c99a4538734fcff314ac478c3071 | |
| parent | Use /check/health instead of /test/hello. (diff) | |
| download | zen-85152e23a1d64f2caabf8467572e052f296133f3.tar.xz zen-85152e23a1d64f2caabf8467572e052f296133f3.zip | |
Respect Jupiter auth token expiration time.
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 208 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 54 |
2 files changed, 150 insertions, 112 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 14da8cbcc..6eaa6423b 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -40,22 +40,32 @@ namespace detail { CloudCacheSessionState(CloudCacheClient& Client) : OwnerClient(Client) {} ~CloudCacheSessionState() {} - void Reset() + const CloudCacheAccessToken& GetAccessToken() { - std::string Auth; - OwnerClient.AcquireAccessToken(Auth); + if (!AccessToken.IsValid()) + { + AccessToken = OwnerClient.AcquireAccessToken(); + } + return AccessToken; + } + void InvalidateAccessToken() { AccessToken = {}; } + + void Reset() + { Session.SetBody({}); - Session.SetOption(cpr::Header{{"Authorization", Auth}}); + Session.SetHeader({}); + AccessToken = GetAccessToken(); } - CloudCacheClient& OwnerClient; - cpr::Session Session; + CloudCacheClient& OwnerClient; + CloudCacheAccessToken AccessToken; + cpr::Session Session; }; } // namespace detail -CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient) +CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) { m_SessionState = m_CacheClient->AllocSessionState(); } @@ -68,16 +78,18 @@ CloudCacheSession::~CloudCacheSession() CloudCacheResult CloudCacheSession::Authenticate() { - std::string Auth; - const bool Success = m_CacheClient->AcquireAccessToken(Auth); - return {.Success = Success}; + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + return {.Success = AccessToken.IsValid()}; } CloudCacheResult CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; @@ -85,7 +97,7 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -94,6 +106,10 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -110,9 +126,13 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) CloudCacheResult CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" @@ -121,7 +141,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", ContentType}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -130,6 +150,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -140,8 +164,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte CloudCacheResult CloudCacheSession::GetCompressedBlob(const IoHash& Key) { - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); @@ -149,7 +176,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -168,10 +195,13 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) CloudCacheResult CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { - IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size()); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); + IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size()); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; @@ -179,8 +209,9 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke auto& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, + {"X-Jupiter-IoHash", Hash.ToHexString()}, + {"Content-Type", "application/octet-stream"}}); Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()}); cpr::Response Response = Session.Put(); @@ -190,6 +221,10 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, @@ -205,11 +240,15 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, CloudCacheResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" @@ -218,7 +257,8 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); + Session.SetOption( + cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); cpr::Response Response = Session.Put(); @@ -228,6 +268,10 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, @@ -237,8 +281,11 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer CloudCacheResult CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) { - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); @@ -246,7 +293,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); @@ -256,6 +303,10 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, @@ -274,22 +325,21 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& return {}; } -////////////////////////////////////////////////////////////////////////// - -std::string -CloudCacheAccessToken::GetAuthorizationHeaderValue() +const CloudCacheAccessToken& +CloudCacheSession::GetAccessToken() { - RwLock::SharedLockScope _(m_Lock); - - return "Bearer {}"_format(m_Token); + return m_SessionState->GetAccessToken(); } -inline void -CloudCacheAccessToken::SetToken(std::string_view Token) +bool +CloudCacheSession::VerifyAccessToken(long StatusCode) { - RwLock::ExclusiveLockScope _(m_Lock); - m_Token = Token; - ++m_Serial; + if (StatusCode == 401) + { + m_SessionState->InvalidateAccessToken(); + return false; + } + return true; } ////////////////////////////////////////////////////////////////////////// @@ -354,60 +404,33 @@ CloudCacheClient::~CloudCacheClient() } } -bool -CloudCacheClient::AcquireAccessToken(std::string& AuthorizationHeaderValue) +CloudCacheAccessToken +CloudCacheClient::AcquireAccessToken() { - // TODO: check for expiration - - if (!m_IsValid) - { - ExtendableStringBuilder<128> OAuthFormData; - OAuthFormData << "client_id=" << m_OAuthClientId - << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; - - const uint32_t CurrentSerial = m_AccessToken.GetSerial(); + using namespace std::chrono; - static RwLock AuthMutex; - RwLock::ExclusiveLockScope _(AuthMutex); - - // Protect against redundant authentication operations - if (m_AccessToken.GetSerial() != CurrentSerial) - { - // TODO: this could verify that the token is actually valid and retry if not? - - return true; - } + ExtendableStringBuilder<128> OAuthFormData; + OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; - std::string data{OAuthFormData}; + std::string Body{OAuthFormData}; - cpr::Response Response = - cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{data}); + cpr::Response Response = + cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{Body}); - std::string Body{std::move(Response.text)}; + Body = std::move(Response.text); - // Parse JSON response - - std::string JsonError; - json11::Json JsonResponse = json11::Json::parse(Body, /* out */ JsonError); - if (!JsonError.empty()) - { - ZEN_WARN("failed to parse OAuth response: '{}'", JsonError); - - return false; - } - - std::string AccessToken = JsonResponse["access_token"].string_value(); - int ExpiryTimeSeconds = JsonResponse["expires_in"].int_value(); - ZEN_UNUSED(ExpiryTimeSeconds); - - m_AccessToken.SetToken(AccessToken); - - m_IsValid = true; + std::string JsonError; + json11::Json JsonResponse = json11::Json::parse(Body, JsonError); + if (!JsonError.empty()) + { + return {}; } - AuthorizationHeaderValue = m_AccessToken.GetAuthorizationHeaderValue(); + std::string AccessToken = std::string("Bearer ") + JsonResponse["access_token"].string_value(); + int64_t ExpiresInSeconds = static_cast<int64_t>(JsonResponse["expires_in"].int_value()); + steady_clock::time_point ExpireTime = steady_clock::now() + seconds(ExpiresInSeconds); - return true; + return {std::move(AccessToken), ExpireTime}; } detail::CloudCacheSessionState* @@ -434,8 +457,19 @@ CloudCacheClient::AllocSessionState() void CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) { + const bool IsTokenValid = State->AccessToken.IsValid(); + RwLock::ExclusiveLockScope _(m_SessionStateLock); m_SessionStateCache.push_front(State); + + // Invalidate all cached access tokens if any one fails + if (!IsTokenValid) + { + for (auto& CachedState : m_SessionStateCache) + { + CachedState->AccessToken = {}; + } + } } } // namespace zen diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 94e7e7680..868a7b099 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -8,6 +8,7 @@ #include <zenhttp/httpserver.h> #include <atomic> +#include <chrono> #include <list> #include <memory> #include <vector> @@ -29,15 +30,17 @@ class CbObjectView; */ struct CloudCacheAccessToken { - std::string GetAuthorizationHeaderValue(); - void SetToken(std::string_view Token); + static constexpr int64_t ExpireMarginInSeconds = 30; - inline uint32_t GetSerial() const { return m_Serial.load(std::memory_order::memory_order_relaxed); } + std::string Value; + std::chrono::steady_clock::time_point ExpireTime; -private: - RwLock m_Lock; - std::string m_Token; - std::atomic<uint32_t> m_Serial; + bool IsValid() const + { + return !Value.empty() && + ExpireMarginInSeconds < + std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - std::chrono::steady_clock::now()).count(); + } }; struct CloudCacheResult @@ -60,7 +63,7 @@ struct CloudCacheResult class CloudCacheSession { public: - CloudCacheSession(CloudCacheClient* OuterClient); + CloudCacheSession(CloudCacheClient* CacheClient); ~CloudCacheSession(); CloudCacheResult Authenticate(); @@ -77,7 +80,9 @@ public: std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); private: - inline spdlog::logger& Log() { return m_Log; } + inline spdlog::logger& Log() { return m_Log; } + const CloudCacheAccessToken& GetAccessToken(); + bool VerifyAccessToken(long StatusCode); spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; @@ -104,26 +109,25 @@ public: CloudCacheClient(const CloudCacheClientOptions& Options); ~CloudCacheClient(); - bool AcquireAccessToken(std::string& AuthorizationHeaderValue); - std::string_view DdcNamespace() const { return m_DdcNamespace; } - std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } - std::string_view ServiceUrl() const { return m_ServiceUrl; } - bool IsValid() const { return m_IsValid; } + CloudCacheAccessToken AcquireAccessToken(); + std::string_view DdcNamespace() const { return m_DdcNamespace; } + std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } + std::string_view ServiceUrl() const { return m_ServiceUrl; } + bool IsValid() const { return m_IsValid; } spdlog::logger& Logger() { return m_Log; } private: - spdlog::logger& m_Log; - std::string m_ServiceUrl; - std::string m_OAuthDomain; - std::string m_OAuthUriPath; - std::string m_OAuthFullUri; - std::string m_DdcNamespace; - std::string m_BlobStoreNamespace; - std::string m_OAuthClientId; - std::string m_OAuthSecret; - CloudCacheAccessToken m_AccessToken; - bool m_IsValid = false; + spdlog::logger& m_Log; + std::string m_ServiceUrl; + std::string m_OAuthDomain; + std::string m_OAuthUriPath; + std::string m_OAuthFullUri; + std::string m_DdcNamespace; + std::string m_BlobStoreNamespace; + std::string m_OAuthClientId; + std::string m_OAuthSecret; + bool m_IsValid = false; RwLock m_SessionStateLock; std::list<detail::CloudCacheSessionState*> m_SessionStateCache; |