aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-12 12:18:21 +0200
committerGitHub <[email protected]>2022-05-12 12:18:21 +0200
commit0fc71fc55feb0bfa98c1e6a24f8d8485859dfc70 (patch)
tree2515a259075a46745e71b62abfadee293eefe9c8 /zenserver/upstream
parentMerge pull request #92 from EpicGames/de/bucket-standalone-temp-file-cleanup (diff)
parentuse string::compare in caseSensitiveCompareStrings (diff)
downloadzen-actions_updates.tar.xz
zen-actions_updates.zip
Merge pull request #93 from EpicGames/de/namespaces-continuedv1.0.1.7actions_updates
De/namespaces continued
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/hordecompute.cpp20
-rw-r--r--zenserver/upstream/jupiter.cpp106
-rw-r--r--zenserver/upstream/jupiter.h58
-rw-r--r--zenserver/upstream/upstreamcache.cpp108
4 files changed, 164 insertions, 128 deletions
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
index dbf86cc13..2ec24b303 100644
--- a/zenserver/upstream/hordecompute.cpp
+++ b/zenserver/upstream/hordecompute.cpp
@@ -198,7 +198,8 @@ namespace detail {
}
{
- PutRefResult RefResult = StorageSession.PutRef("requests"sv,
+ PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().DefaultBlobStoreNamespace(),
+ "requests"sv,
UpstreamData.TaskId,
UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(),
ZenContentType::kCbObject);
@@ -292,7 +293,7 @@ namespace detail {
std::set<IoHash> Keys;
std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
- CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys);
+ CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys);
Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}",
Keys.size(),
ExistsResult.Needs.size(),
@@ -309,7 +310,7 @@ namespace detail {
for (const auto& Hash : ExistsResult.Needs)
{
- CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash));
+ CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, Blobs.at(Hash));
Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
@@ -339,7 +340,7 @@ namespace detail {
std::set<IoHash> Keys;
std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
- CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys);
+ CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().DefaultBlobStoreNamespace(), Keys);
Log().debug("Queried {} missing objects Need={} Duration={}s Result={}",
Keys.size(),
ExistsResult.Needs.size(),
@@ -356,7 +357,8 @@ namespace detail {
for (const auto& Hash : ExistsResult.Needs)
{
- CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer());
+ CloudCacheResult Result =
+ Session.PutObject(Session.Client().DefaultBlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer());
Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
@@ -691,7 +693,8 @@ namespace detail {
std::map<IoHash, IoBuffer> BinaryData;
{
- CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject);
+ CloudCacheResult ObjectRefResult =
+ Session.GetRef(Session.Client().DefaultBlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject);
Log().debug("Get ref {} Bytes={} Duration={}s Result={}",
ResultHash,
ObjectRefResult.Bytes,
@@ -718,7 +721,8 @@ namespace detail {
std::set<IoHash> NeededData;
if (OutputHash != IoHash::Zero)
{
- GetObjectReferencesResult ObjectReferenceResult = Session.GetObjectReferences(OutputHash);
+ GetObjectReferencesResult ObjectReferenceResult =
+ Session.GetObjectReferences(Session.Client().DefaultBlobStoreNamespace(), OutputHash);
Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}",
ResultHash,
ObjectReferenceResult.References.size(),
@@ -748,7 +752,7 @@ namespace detail {
{
continue;
}
- CloudCacheResult BlobResult = Session.GetBlob(Hash);
+ CloudCacheResult BlobResult = Session.GetBlob(Session.Client().DefaultBlobStoreNamespace(), Hash);
Log().debug("Get blob {} Bytes={} Duration={}s Result={}",
Hash,
BlobResult.Bytes,
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 4bec41a29..ddc6c49d2 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -83,12 +83,12 @@ CloudCacheSession::Authenticate()
}
CloudCacheResult
-CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key)
+CloudCacheSession::GetDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key)
{
ZEN_TRACE_CPU("HordeClient::GetDerivedData");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << Namespace << "/" << BucketId << "/" << Key;
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -115,19 +115,18 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
}
CloudCacheResult
-CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key)
+CloudCacheSession::GetDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
{
- return GetDerivedData(BucketId, Key.ToHexString());
+ return GetDerivedData(Namespace, BucketId, Key.ToHexString());
}
CloudCacheResult
-CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
+CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
{
const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
- << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -155,10 +154,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
}
CloudCacheResult
-CloudCacheSession::GetBlob(const IoHash& Key)
+CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
{
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -187,12 +186,12 @@ CloudCacheSession::GetBlob(const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::GetCompressedBlob(const IoHash& Key)
+CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key)
{
ZEN_TRACE_CPU("HordeClient::GetCompressedBlob");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -220,12 +219,12 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::GetObject(const IoHash& Key)
+CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
{
ZEN_TRACE_CPU("HordeClient::GetObject");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -253,14 +252,14 @@ CloudCacheSession::GetObject(const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData)
+CloudCacheSession::PutDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key, IoBuffer DerivedData)
{
ZEN_TRACE_CPU("HordeClient::PutDerivedData");
IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size());
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << Namespace << "/" << BucketId << "/" << Key;
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -289,13 +288,13 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
}
CloudCacheResult
-CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData)
+CloudCacheSession::PutDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData)
{
- return PutDerivedData(BucketId, Key.ToHexString(), DerivedData);
+ return PutDerivedData(Namespace, BucketId, Key.ToHexString(), DerivedData);
}
PutRefResult
-CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
+CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
{
ZEN_TRACE_CPU("HordeClient::PutRef");
@@ -304,8 +303,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
- << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -356,13 +354,13 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
}
FinalizeRefResult
-CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
+CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
{
ZEN_TRACE_CPU("HordeClient::FinalizeRef");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
- << Key.ToHexString() << "/finalize/" << RefHash.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/"
+ << RefHash.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -414,12 +412,12 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con
}
CloudCacheResult
-CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob)
+CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
{
ZEN_TRACE_CPU("HordeClient::PutBlob");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -446,12 +444,12 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob)
}
CloudCacheResult
-CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
+CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
{
ZEN_TRACE_CPU("HordeClient::PutCompressedBlob");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -478,12 +476,12 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
}
CloudCacheResult
-CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object)
+CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
{
ZEN_TRACE_CPU("HordeClient::PutObject");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -510,13 +508,12 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object)
}
CloudCacheResult
-CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
+CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
{
ZEN_TRACE_CPU("HordeClient::RefExists");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
- << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -541,13 +538,12 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
}
GetObjectReferencesResult
-CloudCacheSession::GetObjectReferences(const IoHash& Key)
+CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
{
ZEN_TRACE_CPU("HordeClient::GetObjectReferences");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString()
- << "/references";
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references";
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -585,39 +581,39 @@ CloudCacheSession::GetObjectReferences(const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::BlobExists(const IoHash& Key)
+CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key)
{
- return CacheTypeExists("blobs"sv, Key);
+ return CacheTypeExists(Namespace, "blobs"sv, Key);
}
CloudCacheResult
-CloudCacheSession::CompressedBlobExists(const IoHash& Key)
+CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
{
- return CacheTypeExists("compressed-blobs"sv, Key);
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
}
CloudCacheResult
-CloudCacheSession::ObjectExists(const IoHash& Key)
+CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
{
- return CacheTypeExists("objects"sv, Key);
+ return CacheTypeExists(Namespace, "objects"sv, Key);
}
CloudCacheExistsResult
-CloudCacheSession::BlobExists(const std::set<IoHash>& Keys)
+CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
{
- return CacheTypeExists("blobs"sv, Keys);
+ return CacheTypeExists(Namespace, "blobs"sv, Keys);
}
CloudCacheExistsResult
-CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys)
+CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
{
- return CacheTypeExists("compressed-blobs"sv, Keys);
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
}
CloudCacheExistsResult
-CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys)
+CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
{
- return CacheTypeExists("objects"sv, Keys);
+ return CacheTypeExists(Namespace, "objects"sv, Keys);
}
CloudCacheResult
@@ -685,11 +681,11 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t
}
std::vector<IoHash>
-CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
+CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
{
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace();
+ Uri << "/api/v1/s/" << Namespace;
ZEN_UNUSED(BucketId, ChunkHashes);
@@ -715,12 +711,12 @@ CloudCacheSession::VerifyAccessToken(long StatusCode)
}
CloudCacheResult
-CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key)
+CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
{
ZEN_TRACE_CPU("HordeClient::CacheTypeExists");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -745,7 +741,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key)
}
CloudCacheExistsResult
-CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys)
+CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
{
ZEN_TRACE_CPU("HordeClient::CacheTypeExists");
@@ -758,7 +754,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHas
Body << "]";
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exist";
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist";
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -894,8 +890,8 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken(
CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
: m_Log(zen::logging::Get("jupiter"))
, m_ServiceUrl(Options.ServiceUrl)
-, m_DdcNamespace(Options.DdcNamespace)
-, m_BlobStoreNamespace(Options.BlobStoreNamespace)
+, m_DefaultDdcNamespace(Options.DdcNamespace)
+, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
, m_ComputeCluster(Options.ComputeCluster)
, m_ConnectTimeout(Options.ConnectTimeout)
, m_Timeout(Options.Timeout)
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index cff9a9ef1..3d9e6ea7b 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -95,38 +95,40 @@ public:
~CloudCacheSession();
CloudCacheResult Authenticate();
- CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key);
- CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key);
- CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
- CloudCacheResult GetBlob(const IoHash& Key);
- CloudCacheResult GetCompressedBlob(const IoHash& Key);
- CloudCacheResult GetObject(const IoHash& Key);
+ CloudCacheResult GetDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key);
+ CloudCacheResult GetDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
+ CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
+ CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData);
- CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData);
- PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
- CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob);
- CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob);
- CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object);
+ CloudCacheResult PutDerivedData(std::string_view Namespace, std::string_view BucketId, std::string_view Key, IoBuffer DerivedData);
+ CloudCacheResult PutDerivedData(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData);
+ PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
- FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+ FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
- CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
+ CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
- GetObjectReferencesResult GetObjectReferences(const IoHash& Key);
+ GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult BlobExists(const IoHash& Key);
- CloudCacheResult CompressedBlobExists(const IoHash& Key);
- CloudCacheResult ObjectExists(const IoHash& Key);
+ CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key);
- CloudCacheExistsResult BlobExists(const std::set<IoHash>& Keys);
- CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys);
- CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys);
+ CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
CloudCacheResult PostComputeTasks(IoBuffer TasksData);
CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0);
- std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+ std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+
+ CloudCacheClient& Client() { return *m_CacheClient; };
private:
inline spdlog::logger& Log() { return m_Log; }
@@ -134,9 +136,9 @@ private:
CloudCacheAccessToken GetAccessToken(bool RefreshToken = false);
bool VerifyAccessToken(long StatusCode);
- CloudCacheResult CacheTypeExists(std::string_view TypeId, const IoHash& Key);
+ CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
- CloudCacheExistsResult CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys);
+ CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
spdlog::logger& m_Log;
RefPtr<CloudCacheClient> m_CacheClient;
@@ -189,8 +191,8 @@ public:
~CloudCacheClient();
CloudCacheAccessToken AcquireAccessToken();
- std::string_view DdcNamespace() const { return m_DdcNamespace; }
- std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
std::string_view ComputeCluster() const { return m_ComputeCluster; }
std::string_view ServiceUrl() const { return m_ServiceUrl; }
@@ -199,8 +201,8 @@ public:
private:
spdlog::logger& m_Log;
std::string m_ServiceUrl;
- std::string m_DdcNamespace;
- std::string m_BlobStoreNamespace;
+ std::string m_DefaultDdcNamespace;
+ std::string m_DefaultBlobStoreNamespace;
std::string m_ComputeCluster;
std::chrono::milliseconds m_ConnectTimeout{};
std::chrono::milliseconds m_Timeout{};
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index c870e0773..52513abe9 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -160,6 +160,24 @@ namespace detail {
}
}
+ std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace)
+ {
+ if (Namespace == ZenCacheStore::DefaultNamespace)
+ {
+ return Session.Client().DefaultDdcNamespace();
+ }
+ return Namespace;
+ }
+
+ std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace)
+ {
+ if (Namespace == ZenCacheStore::DefaultNamespace)
+ {
+ return Session.Client().DefaultBlobStoreNamespace();
+ }
+ return Namespace;
+ }
+
virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); }
virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
@@ -173,13 +191,16 @@ namespace detail {
CloudCacheSession Session(m_Client);
CloudCacheResult Result;
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+
if (m_UseLegacyDdc && Type == ZenContentType::kBinary)
{
- Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash);
+ std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace);
+ Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash);
}
else if (Type == ZenContentType::kCompressedBinary)
{
- Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
if (Result.Success)
{
@@ -190,23 +211,22 @@ namespace detail {
IoBuffer ContentBuffer;
int NumAttachments = 0;
- CacheRecord.IterateAttachments(
- [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
- Result.Bytes += AttachmentResult.Bytes;
- Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
- Result.ErrorCode = AttachmentResult.ErrorCode;
+ CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ Result.Bytes += AttachmentResult.Bytes;
+ Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+ Result.ErrorCode = AttachmentResult.ErrorCode;
- if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
- {
- Result.Response = AttachmentResult.Response;
- ++NumAttachments;
- }
- else
- {
- Result.Success = false;
- }
- });
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Result.Response = AttachmentResult.Response;
+ ++NumAttachments;
+ }
+ else
+ {
+ Result.Success = false;
+ }
+ });
if (NumAttachments != 1)
{
Result.Success = false;
@@ -217,7 +237,7 @@ namespace detail {
else
{
const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type;
- Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType);
+ Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, AcceptType);
if (Result.Success && Type == ZenContentType::kCbPackage)
{
@@ -228,8 +248,8 @@ namespace detail {
{
CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
- CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
Result.Bytes += AttachmentResult.Bytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
Result.ErrorCode = AttachmentResult.ErrorCode;
@@ -294,7 +314,9 @@ namespace detail {
if (!Result.Error)
{
- CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+ CloudCacheResult RefResult =
+ Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
AppendResult(RefResult, Result);
m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
@@ -305,8 +327,8 @@ namespace detail {
if (ValidationResult == CbValidateError::None)
{
Record = LoadCompactBinaryObject(RefResult.Response);
- Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) {
- CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Record.IterateAttachments([&](CbFieldView AttachmentHash) {
+ CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
AppendResult(BlobResult, Result);
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
@@ -329,14 +351,15 @@ namespace detail {
return Result;
}
- virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue");
try
{
CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+ const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -376,8 +399,9 @@ namespace detail {
CompressedBuffer Compressed;
if (!Result.Error)
{
- const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId);
- Payload = BlobResult.Response;
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace);
+ const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId);
+ Payload = BlobResult.Response;
AppendResult(BlobResult, Result);
@@ -422,13 +446,18 @@ namespace detail {
CloudCacheResult Result;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace);
if (m_UseLegacyDdc)
{
- Result = Session.PutDerivedData(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue);
+ Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue);
}
else
{
- Result = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kBinary);
+ Result = Session.PutRef(BlobStoreNamespace,
+ CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
+ RecordValue,
+ ZenContentType::kBinary);
}
}
@@ -527,7 +556,8 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace);
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
for (const IoHash& ValueContentId : ValueContentIds)
{
IoBuffer BlobBuffer;
@@ -539,7 +569,7 @@ namespace detail {
CloudCacheResult BlobResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
- BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer);
+ BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer);
}
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
@@ -560,7 +590,7 @@ namespace detail {
PutRefResult RefResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
{
- RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject);
+ RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject);
}
m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
@@ -581,7 +611,7 @@ namespace detail {
}
const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer);
- FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash);
m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
@@ -599,7 +629,7 @@ namespace detail {
return {.Reason = std::move(Reason), .Success = false};
}
- FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
+ FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash);
m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
@@ -761,6 +791,7 @@ namespace detail {
const CacheKey& Key = Request->Key;
BatchRequest.BeginObject("Key"sv);
{
+ BatchRequest << "Namespace"sv << Key.Namespace;
BatchRequest << "Bucket"sv << Key.Bucket;
BatchRequest << "Hash"sv << Key.Hash;
}
@@ -871,6 +902,7 @@ namespace detail {
BatchRequest.BeginObject();
{
BatchRequest.BeginObject("Key"sv);
+ BatchRequest << "Namespace"sv << Request.Key.Namespace;
BatchRequest << "Bucket"sv << Request.Key.Bucket;
BatchRequest << "Hash"sv << Request.Key.Hash;
BatchRequest.EndObject();
@@ -1042,6 +1074,7 @@ namespace detail {
const CacheKey& Key = CacheRecord.Key;
BatchWriter.BeginObject("Key"sv);
{
+ BatchWriter << "Namespace"sv << Key.Namespace;
BatchWriter << "Bucket"sv << Key.Bucket;
BatchWriter << "Hash"sv << Key.Hash;
}
@@ -1517,7 +1550,7 @@ private:
ZenCacheValue CacheValue;
std::vector<IoBuffer> Payloads;
- if (!m_CacheStore.Get(ZenCacheStore::DefaultNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
+ if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
{
ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash);
return;
@@ -1531,7 +1564,8 @@ private:
}
else
{
- ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS",
+ ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS",
+ CacheRecord.Key.Namespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
ValueContentId);