diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-06 16:42:27 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-11 16:13:36 +0200 |
| commit | 33fa76a35a96cad1865854068e60c1ca0b53864e (patch) | |
| tree | eaf79c48e277286270ac3862ec6897b4bfab4fba /zenserver/upstream | |
| parent | Merge pull request #92 from EpicGames/de/bucket-standalone-temp-file-cleanup (diff) | |
| download | zen-33fa76a35a96cad1865854068e60c1ca0b53864e.tar.xz zen-33fa76a35a96cad1865854068e60c1ca0b53864e.zip | |
parameterize namespace for upstream (first hack)
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/hordecompute.cpp | 20 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 102 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 50 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 66 |
4 files changed, 135 insertions, 103 deletions
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp index dbf86cc13..4d502a193 100644 --- a/zenserver/upstream/hordecompute.cpp +++ b/zenserver/upstream/hordecompute.cpp @@ -198,7 +198,8 @@ namespace detail { } { - PutRefResult RefResult = StorageSession.PutRef("requests"sv, + PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().BlobStoreNamespace(), + "requests"sv, UpstreamData.TaskId, UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), ZenContentType::kCbObject); @@ -292,7 +293,7 @@ namespace detail { std::set<IoHash> Keys; std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); + CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().BlobStoreNamespace(), Keys); Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", Keys.size(), ExistsResult.Needs.size(), @@ -309,7 +310,7 @@ namespace detail { for (const auto& Hash : ExistsResult.Needs) { - CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash)); + CloudCacheResult Result = Session.PutBlob(Session.Client().BlobStoreNamespace(), Hash, Blobs.at(Hash)); Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -339,7 +340,7 @@ namespace detail { std::set<IoHash> Keys; std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); + CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().BlobStoreNamespace(), Keys); Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", Keys.size(), ExistsResult.Needs.size(), @@ -356,7 +357,8 @@ namespace detail { for (const auto& Hash : ExistsResult.Needs) { - CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); + CloudCacheResult Result = + Session.PutObject(Session.Client().BlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -691,7 +693,8 @@ namespace detail { std::map<IoHash, IoBuffer> BinaryData; { - CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); + CloudCacheResult ObjectRefResult = + Session.GetRef(Session.Client().BlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject); Log().debug("Get ref {} Bytes={} Duration={}s Result={}", ResultHash, ObjectRefResult.Bytes, @@ -718,7 +721,8 @@ namespace detail { std::set<IoHash> NeededData; if (OutputHash != IoHash::Zero) { - GetObjectReferencesResult ObjectReferenceResult = Session.GetObjectReferences(OutputHash); + GetObjectReferencesResult ObjectReferenceResult = + Session.GetObjectReferences(Session.Client().BlobStoreNamespace(), OutputHash); Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}", ResultHash, ObjectReferenceResult.References.size(), @@ -748,7 +752,7 @@ namespace detail { { continue; } - CloudCacheResult BlobResult = Session.GetBlob(Hash); + CloudCacheResult BlobResult = Session.GetBlob(Session.Client().BlobStoreNamespace(), Hash); Log().debug("Get blob {} Bytes={} Duration={}s Result={}", Hash, BlobResult.Bytes, diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 4bec41a29..881798c1f 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -83,12 +83,12 @@ CloudCacheSession::Authenticate() } CloudCacheResult -CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) +CloudCacheSession::GetDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key) { ZEN_TRACE_CPU("HordeClient::GetDerivedData"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << Namespace << "/" << BucketId << "/" << Key; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -115,19 +115,18 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke } CloudCacheResult -CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) +CloudCacheSession::GetDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { - return GetDerivedData(BucketId, Key.ToHexString()); + return GetDerivedData(Namespace, BucketId, Key.ToHexString()); } CloudCacheResult -CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -155,10 +154,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte } CloudCacheResult -CloudCacheSession::GetBlob(const IoHash& Key) +CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) { ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -187,12 +186,12 @@ CloudCacheSession::GetBlob(const IoHash& Key) } CloudCacheResult -CloudCacheSession::GetCompressedBlob(const IoHash& Key) +CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -220,12 +219,12 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) } CloudCacheResult -CloudCacheSession::GetObject(const IoHash& Key) +CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObject"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -253,14 +252,14 @@ CloudCacheSession::GetObject(const IoHash& Key) } CloudCacheResult -CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) +CloudCacheSession::PutDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { ZEN_TRACE_CPU("HordeClient::PutDerivedData"); IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size()); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << Namespace << "/" << BucketId << "/" << Key; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -289,13 +288,13 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke } CloudCacheResult -CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) +CloudCacheSession::PutDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) { - return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); + return PutDerivedData(Namespace, BucketId, Key.ToHexString(), DerivedData); } PutRefResult -CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { ZEN_TRACE_CPU("HordeClient::PutRef"); @@ -304,8 +303,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -356,13 +354,13 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer } FinalizeRefResult -CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) { ZEN_TRACE_CPU("HordeClient::FinalizeRef"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" + << RefHash.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -414,12 +412,12 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con } CloudCacheResult -CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) +CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("HordeClient::PutBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -446,12 +444,12 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) } CloudCacheResult -CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -478,12 +476,12 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) } CloudCacheResult -CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) +CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("HordeClient::PutObject"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -510,13 +508,12 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) } CloudCacheResult -CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) +CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::RefExists"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" - << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -541,13 +538,12 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) } GetObjectReferencesResult -CloudCacheSession::GetObjectReferences(const IoHash& Key) +CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObjectReferences"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() - << "/references"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -585,39 +581,39 @@ CloudCacheSession::GetObjectReferences(const IoHash& Key) } CloudCacheResult -CloudCacheSession::BlobExists(const IoHash& Key) +CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("blobs"sv, Key); + return CacheTypeExists(Namespace, "blobs"sv, Key); } CloudCacheResult -CloudCacheSession::CompressedBlobExists(const IoHash& Key) +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("compressed-blobs"sv, Key); + return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); } CloudCacheResult -CloudCacheSession::ObjectExists(const IoHash& Key) +CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) { - return CacheTypeExists("objects"sv, Key); + return CacheTypeExists(Namespace, "objects"sv, Key); } CloudCacheExistsResult -CloudCacheSession::BlobExists(const std::set<IoHash>& Keys) +CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("blobs"sv, Keys); + return CacheTypeExists(Namespace, "blobs"sv, Keys); } CloudCacheExistsResult -CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys) +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("compressed-blobs"sv, Keys); + return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); } CloudCacheExistsResult -CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys) +CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) { - return CacheTypeExists("objects"sv, Keys); + return CacheTypeExists(Namespace, "objects"sv, Keys); } CloudCacheResult @@ -685,11 +681,11 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t } std::vector<IoHash> -CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) +CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace(); + Uri << "/api/v1/s/" << Namespace; ZEN_UNUSED(BucketId, ChunkHashes); @@ -715,12 +711,12 @@ CloudCacheSession::VerifyAccessToken(long StatusCode) } CloudCacheResult -CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -745,7 +741,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) } CloudCacheExistsResult -CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys) +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) { ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); @@ -758,7 +754,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHas Body << "]"; ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exist"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index cff9a9ef1..a74d4d81d 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -95,38 +95,40 @@ public: ~CloudCacheSession(); CloudCacheResult Authenticate(); - CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key); - CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key); - CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType); - CloudCacheResult GetBlob(const IoHash& Key); - CloudCacheResult GetCompressedBlob(const IoHash& Key); - CloudCacheResult GetObject(const IoHash& Key); + CloudCacheResult GetDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key); + CloudCacheResult GetDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); + CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); - CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); - CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); - PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); - CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob); - CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); - CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object); + CloudCacheResult PutDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); + CloudCacheResult PutDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); + PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); - FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); - CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); + CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); - GetObjectReferencesResult GetObjectReferences(const IoHash& Key); + GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); - CloudCacheResult BlobExists(const IoHash& Key); - CloudCacheResult CompressedBlobExists(const IoHash& Key); - CloudCacheResult ObjectExists(const IoHash& Key); + CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key); - CloudCacheExistsResult BlobExists(const std::set<IoHash>& Keys); - CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys); - CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys); + CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); CloudCacheResult PostComputeTasks(IoBuffer TasksData); CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); - std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + + CloudCacheClient& Client() { return *m_CacheClient; }; private: inline spdlog::logger& Log() { return m_Log; } @@ -134,9 +136,9 @@ private: CloudCacheAccessToken GetAccessToken(bool RefreshToken = false); bool VerifyAccessToken(long StatusCode); - CloudCacheResult CacheTypeExists(std::string_view TypeId, const IoHash& Key); + CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); - CloudCacheExistsResult CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys); + CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c870e0773..2b52b8efa 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -175,11 +175,15 @@ namespace detail { if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); + Result = + Session.GetDerivedData(CacheKey.Namespace, CacheKey.Bucket, CacheKey.Hash); // Session.Client().DdcNamespace(), } else if (Type == ZenContentType::kCompressedBinary) { - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + Result = Session.GetRef(CacheKey.Namespace, + CacheKey.Bucket, + CacheKey.Hash, + ZenContentType::kCbObject); // Session.Client().BlobStoreNamespace(), if (Result.Success) { @@ -192,7 +196,8 @@ namespace detail { CacheRecord.IterateAttachments( [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + CloudCacheResult AttachmentResult = + Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -217,7 +222,10 @@ namespace detail { else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); + Result = Session.GetRef(CacheKey.Namespace, + CacheKey.Bucket, + CacheKey.Hash, + AcceptType); // Session.Client().BlobStoreNamespace() if (Result.Success && Type == ZenContentType::kCbPackage) { @@ -229,7 +237,8 @@ namespace detail { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + CloudCacheResult AttachmentResult = + Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -294,7 +303,10 @@ namespace detail { if (!Result.Error) { - CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + CloudCacheResult RefResult = Session.GetRef(CacheKey.Namespace, + CacheKey.Bucket, + CacheKey.Hash, + ZenContentType::kCbObject); // Session.Client().BlobStoreNamespace() AppendResult(RefResult, Result); m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -306,7 +318,8 @@ namespace detail { { Record = LoadCompactBinaryObject(RefResult.Response); Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + CloudCacheResult BlobResult = + Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash()); AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -336,7 +349,7 @@ namespace detail { try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId); + const CloudCacheResult Result = Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -376,7 +389,7 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); + const CloudCacheResult BlobResult = Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), Request.ChunkId); Payload = BlobResult.Response; AppendResult(BlobResult, Result); @@ -424,11 +437,18 @@ namespace detail { { if (m_UseLegacyDdc) { - Result = Session.PutDerivedData(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); + Result = Session.PutDerivedData(CacheRecord.Key.Namespace, // Session.Client().BlobStoreNamespace(), + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue); } else { - Result = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kBinary); + Result = Session.PutRef(CacheRecord.Key.Namespace, // Session.Client().BlobStoreNamespace(), + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + ZenContentType::kBinary); } } @@ -539,7 +559,7 @@ namespace detail { CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer); + BlobResult = Session.PutCompressedBlob(Session.Client().BlobStoreNamespace(), ValueContentId, BlobBuffer); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -560,7 +580,11 @@ namespace detail { PutRefResult RefResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { - RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); + RefResult = Session.PutRef(Key.Namespace, // Seesion.Client().BlobStoreNamespace(), + Key.Bucket, + Key.Hash, + ObjectBuffer, + ZenContentType::kCbObject); } m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -580,8 +604,9 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); + const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); + FinalizeRefResult FinalizeResult = + Session.FinalizeRef(Key.Namespace, Key.Bucket, Key.Hash, RefHash); // Session.Client().BlobStoreNamespace(), m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -599,7 +624,8 @@ namespace detail { return {.Reason = std::move(Reason), .Success = false}; } - FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); + FinalizeResult = + Session.FinalizeRef(Key.Namespace, Key.Bucket, Key.Hash, RefHash); // Session.Client().BlobStoreNamespace(), m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); @@ -761,6 +787,7 @@ namespace detail { const CacheKey& Key = Request->Key; BatchRequest.BeginObject("Key"sv); { + BatchRequest << "Namespace"sv << Key.Namespace; BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; } @@ -871,6 +898,7 @@ namespace detail { BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); + BatchRequest << "Namespace"sv << Request.Key.Namespace; BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); @@ -1042,6 +1070,7 @@ namespace detail { const CacheKey& Key = CacheRecord.Key; BatchWriter.BeginObject("Key"sv); { + BatchWriter << "Namespace"sv << Key.Namespace; BatchWriter << "Bucket"sv << Key.Bucket; BatchWriter << "Hash"sv << Key.Hash; } @@ -1517,7 +1546,7 @@ private: ZenCacheValue CacheValue; std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(ZenCacheStore::DefaultNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1531,7 +1560,8 @@ private: } else { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS", + ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", + CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); |