aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-09-23 13:56:48 +0200
committerPer Larsson <[email protected]>2021-09-23 13:56:48 +0200
commit85152e23a1d64f2caabf8467572e052f296133f3 (patch)
tree1c3e52d17985c99a4538734fcff314ac478c3071
parentUse /check/health instead of /test/hello. (diff)
downloadzen-85152e23a1d64f2caabf8467572e052f296133f3.tar.xz
zen-85152e23a1d64f2caabf8467572e052f296133f3.zip
Respect Jupiter auth token expiration time.
-rw-r--r--zenserver/upstream/jupiter.cpp208
-rw-r--r--zenserver/upstream/jupiter.h54
2 files changed, 150 insertions, 112 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 14da8cbcc..6eaa6423b 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -40,22 +40,32 @@ namespace detail {
CloudCacheSessionState(CloudCacheClient& Client) : OwnerClient(Client) {}
~CloudCacheSessionState() {}
- void Reset()
+ const CloudCacheAccessToken& GetAccessToken()
{
- std::string Auth;
- OwnerClient.AcquireAccessToken(Auth);
+ if (!AccessToken.IsValid())
+ {
+ AccessToken = OwnerClient.AcquireAccessToken();
+ }
+ return AccessToken;
+ }
+ void InvalidateAccessToken() { AccessToken = {}; }
+
+ void Reset()
+ {
Session.SetBody({});
- Session.SetOption(cpr::Header{{"Authorization", Auth}});
+ Session.SetHeader({});
+ AccessToken = GetAccessToken();
}
- CloudCacheClient& OwnerClient;
- cpr::Session Session;
+ CloudCacheClient& OwnerClient;
+ CloudCacheAccessToken AccessToken;
+ cpr::Session Session;
};
} // namespace detail
-CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient)
+CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
{
m_SessionState = m_CacheClient->AllocSessionState();
}
@@ -68,16 +78,18 @@ CloudCacheSession::~CloudCacheSession()
CloudCacheResult
CloudCacheSession::Authenticate()
{
- std::string Auth;
- const bool Success = m_CacheClient->AcquireAccessToken(Auth);
- return {.Success = Success};
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ return {.Success = AccessToken.IsValid()};
}
CloudCacheResult
CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key)
{
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw";
@@ -85,7 +97,7 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -94,6 +106,10 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -110,9 +126,13 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key)
CloudCacheResult
CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
@@ -121,7 +141,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", ContentType}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -130,6 +150,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -140,8 +164,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
CloudCacheResult
CloudCacheSession::GetCompressedBlob(const IoHash& Key)
{
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
@@ -149,7 +176,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -168,10 +195,13 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
CloudCacheResult
CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData)
{
- IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size());
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
+ IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size());
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
@@ -179,8 +209,9 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
auto& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
+ {"X-Jupiter-IoHash", Hash.ToHexString()},
+ {"Content-Type", "application/octet-stream"}});
Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()});
cpr::Response Response = Session.Put();
@@ -190,6 +221,10 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
@@ -205,11 +240,15 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key,
CloudCacheResult
CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
@@ -218,7 +257,8 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}});
+ Session.SetOption(
+ cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}});
Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()});
cpr::Response Response = Session.Put();
@@ -228,6 +268,10 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
@@ -237,8 +281,11 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
CloudCacheResult
CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
{
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
@@ -246,7 +293,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
cpr::Response Response = Session.Put();
@@ -256,6 +303,10 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
@@ -274,22 +325,21 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>&
return {};
}
-//////////////////////////////////////////////////////////////////////////
-
-std::string
-CloudCacheAccessToken::GetAuthorizationHeaderValue()
+const CloudCacheAccessToken&
+CloudCacheSession::GetAccessToken()
{
- RwLock::SharedLockScope _(m_Lock);
-
- return "Bearer {}"_format(m_Token);
+ return m_SessionState->GetAccessToken();
}
-inline void
-CloudCacheAccessToken::SetToken(std::string_view Token)
+bool
+CloudCacheSession::VerifyAccessToken(long StatusCode)
{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Token = Token;
- ++m_Serial;
+ if (StatusCode == 401)
+ {
+ m_SessionState->InvalidateAccessToken();
+ return false;
+ }
+ return true;
}
//////////////////////////////////////////////////////////////////////////
@@ -354,60 +404,33 @@ CloudCacheClient::~CloudCacheClient()
}
}
-bool
-CloudCacheClient::AcquireAccessToken(std::string& AuthorizationHeaderValue)
+CloudCacheAccessToken
+CloudCacheClient::AcquireAccessToken()
{
- // TODO: check for expiration
-
- if (!m_IsValid)
- {
- ExtendableStringBuilder<128> OAuthFormData;
- OAuthFormData << "client_id=" << m_OAuthClientId
- << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret;
-
- const uint32_t CurrentSerial = m_AccessToken.GetSerial();
+ using namespace std::chrono;
- static RwLock AuthMutex;
- RwLock::ExclusiveLockScope _(AuthMutex);
-
- // Protect against redundant authentication operations
- if (m_AccessToken.GetSerial() != CurrentSerial)
- {
- // TODO: this could verify that the token is actually valid and retry if not?
-
- return true;
- }
+ ExtendableStringBuilder<128> OAuthFormData;
+ OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret;
- std::string data{OAuthFormData};
+ std::string Body{OAuthFormData};
- cpr::Response Response =
- cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{data});
+ cpr::Response Response =
+ cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{Body});
- std::string Body{std::move(Response.text)};
+ Body = std::move(Response.text);
- // Parse JSON response
-
- std::string JsonError;
- json11::Json JsonResponse = json11::Json::parse(Body, /* out */ JsonError);
- if (!JsonError.empty())
- {
- ZEN_WARN("failed to parse OAuth response: '{}'", JsonError);
-
- return false;
- }
-
- std::string AccessToken = JsonResponse["access_token"].string_value();
- int ExpiryTimeSeconds = JsonResponse["expires_in"].int_value();
- ZEN_UNUSED(ExpiryTimeSeconds);
-
- m_AccessToken.SetToken(AccessToken);
-
- m_IsValid = true;
+ std::string JsonError;
+ json11::Json JsonResponse = json11::Json::parse(Body, JsonError);
+ if (!JsonError.empty())
+ {
+ return {};
}
- AuthorizationHeaderValue = m_AccessToken.GetAuthorizationHeaderValue();
+ std::string AccessToken = std::string("Bearer ") + JsonResponse["access_token"].string_value();
+ int64_t ExpiresInSeconds = static_cast<int64_t>(JsonResponse["expires_in"].int_value());
+ steady_clock::time_point ExpireTime = steady_clock::now() + seconds(ExpiresInSeconds);
- return true;
+ return {std::move(AccessToken), ExpireTime};
}
detail::CloudCacheSessionState*
@@ -434,8 +457,19 @@ CloudCacheClient::AllocSessionState()
void
CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State)
{
+ const bool IsTokenValid = State->AccessToken.IsValid();
+
RwLock::ExclusiveLockScope _(m_SessionStateLock);
m_SessionStateCache.push_front(State);
+
+ // Invalidate all cached access tokens if any one fails
+ if (!IsTokenValid)
+ {
+ for (auto& CachedState : m_SessionStateCache)
+ {
+ CachedState->AccessToken = {};
+ }
+ }
}
} // namespace zen
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 94e7e7680..868a7b099 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -8,6 +8,7 @@
#include <zenhttp/httpserver.h>
#include <atomic>
+#include <chrono>
#include <list>
#include <memory>
#include <vector>
@@ -29,15 +30,17 @@ class CbObjectView;
*/
struct CloudCacheAccessToken
{
- std::string GetAuthorizationHeaderValue();
- void SetToken(std::string_view Token);
+ static constexpr int64_t ExpireMarginInSeconds = 30;
- inline uint32_t GetSerial() const { return m_Serial.load(std::memory_order::memory_order_relaxed); }
+ std::string Value;
+ std::chrono::steady_clock::time_point ExpireTime;
-private:
- RwLock m_Lock;
- std::string m_Token;
- std::atomic<uint32_t> m_Serial;
+ bool IsValid() const
+ {
+ return !Value.empty() &&
+ ExpireMarginInSeconds <
+ std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - std::chrono::steady_clock::now()).count();
+ }
};
struct CloudCacheResult
@@ -60,7 +63,7 @@ struct CloudCacheResult
class CloudCacheSession
{
public:
- CloudCacheSession(CloudCacheClient* OuterClient);
+ CloudCacheSession(CloudCacheClient* CacheClient);
~CloudCacheSession();
CloudCacheResult Authenticate();
@@ -77,7 +80,9 @@ public:
std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
private:
- inline spdlog::logger& Log() { return m_Log; }
+ inline spdlog::logger& Log() { return m_Log; }
+ const CloudCacheAccessToken& GetAccessToken();
+ bool VerifyAccessToken(long StatusCode);
spdlog::logger& m_Log;
RefPtr<CloudCacheClient> m_CacheClient;
@@ -104,26 +109,25 @@ public:
CloudCacheClient(const CloudCacheClientOptions& Options);
~CloudCacheClient();
- bool AcquireAccessToken(std::string& AuthorizationHeaderValue);
- std::string_view DdcNamespace() const { return m_DdcNamespace; }
- std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
- std::string_view ServiceUrl() const { return m_ServiceUrl; }
- bool IsValid() const { return m_IsValid; }
+ CloudCacheAccessToken AcquireAccessToken();
+ std::string_view DdcNamespace() const { return m_DdcNamespace; }
+ std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
+ std::string_view ServiceUrl() const { return m_ServiceUrl; }
+ bool IsValid() const { return m_IsValid; }
spdlog::logger& Logger() { return m_Log; }
private:
- spdlog::logger& m_Log;
- std::string m_ServiceUrl;
- std::string m_OAuthDomain;
- std::string m_OAuthUriPath;
- std::string m_OAuthFullUri;
- std::string m_DdcNamespace;
- std::string m_BlobStoreNamespace;
- std::string m_OAuthClientId;
- std::string m_OAuthSecret;
- CloudCacheAccessToken m_AccessToken;
- bool m_IsValid = false;
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::string m_OAuthDomain;
+ std::string m_OAuthUriPath;
+ std::string m_OAuthFullUri;
+ std::string m_DdcNamespace;
+ std::string m_BlobStoreNamespace;
+ std::string m_OAuthClientId;
+ std::string m_OAuthSecret;
+ bool m_IsValid = false;
RwLock m_SessionStateLock;
std::list<detail::CloudCacheSessionState*> m_SessionStateCache;