aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-10 09:28:38 +0200
committerDan Engelbrecht <[email protected]>2022-05-11 16:13:36 +0200
commit41aa73ae51fd251969c6abf9ffafa5b40aad22e6 (patch)
treee6aba5b1a7cd434d6d44831e48e6e1fafc0da1f0
parentparameterize namespace for upstream (first hack) (diff)
downloadzen-41aa73ae51fd251969c6abf9ffafa5b40aad22e6.tar.xz
zen-41aa73ae51fd251969c6abf9ffafa5b40aad22e6.zip
Use configured namespace in Jupiter if not explicit namespace is given
DdcNamespace -> DefaultDdcNamespace BlobStoreNamespace -> DefaultBlobStoreNamespace
-rw-r--r--zenserver/upstream/hordecompute.cpp16
-rw-r--r--zenserver/upstream/jupiter.cpp4
-rw-r--r--zenserver/upstream/jupiter.h8
-rw-r--r--zenserver/upstream/upstreamcache.cpp120
4 files changed, 76 insertions, 72 deletions
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
index 4d502a193..2ec24b303 100644
--- a/zenserver/upstream/hordecompute.cpp
+++ b/zenserver/upstream/hordecompute.cpp
@@ -198,7 +198,7 @@ namespace detail {
}
{
- PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().BlobStoreNamespace(),
+ PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().DefaultBlobStoreNamespace(),
"requests"sv,
UpstreamData.TaskId,
UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(),
@@ -293,7 +293,7 @@ namespace detail {
std::set<IoHash> Keys;
std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
- CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().BlobStoreNamespace(), Keys);
+ CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys);
Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}",
Keys.size(),
ExistsResult.Needs.size(),
@@ -310,7 +310,7 @@ namespace detail {
for (const auto& Hash : ExistsResult.Needs)
{
- CloudCacheResult Result = Session.PutBlob(Session.Client().BlobStoreNamespace(), Hash, Blobs.at(Hash));
+ CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, Blobs.at(Hash));
Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
@@ -340,7 +340,7 @@ namespace detail {
std::set<IoHash> Keys;
std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
- CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().BlobStoreNamespace(), Keys);
+ CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().DefaultBlobStoreNamespace(), Keys);
Log().debug("Queried {} missing objects Need={} Duration={}s Result={}",
Keys.size(),
ExistsResult.Needs.size(),
@@ -358,7 +358,7 @@ namespace detail {
for (const auto& Hash : ExistsResult.Needs)
{
CloudCacheResult Result =
- Session.PutObject(Session.Client().BlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer());
+ Session.PutObject(Session.Client().DefaultBlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer());
Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
@@ -694,7 +694,7 @@ namespace detail {
{
CloudCacheResult ObjectRefResult =
- Session.GetRef(Session.Client().BlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject);
+ Session.GetRef(Session.Client().DefaultBlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject);
Log().debug("Get ref {} Bytes={} Duration={}s Result={}",
ResultHash,
ObjectRefResult.Bytes,
@@ -722,7 +722,7 @@ namespace detail {
if (OutputHash != IoHash::Zero)
{
GetObjectReferencesResult ObjectReferenceResult =
- Session.GetObjectReferences(Session.Client().BlobStoreNamespace(), OutputHash);
+ Session.GetObjectReferences(Session.Client().DefaultBlobStoreNamespace(), OutputHash);
Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}",
ResultHash,
ObjectReferenceResult.References.size(),
@@ -752,7 +752,7 @@ namespace detail {
{
continue;
}
- CloudCacheResult BlobResult = Session.GetBlob(Session.Client().BlobStoreNamespace(), Hash);
+ CloudCacheResult BlobResult = Session.GetBlob(Session.Client().DefaultBlobStoreNamespace(), Hash);
Log().debug("Get blob {} Bytes={} Duration={}s Result={}",
Hash,
BlobResult.Bytes,
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 881798c1f..ddc6c49d2 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -890,8 +890,8 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken(
CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
: m_Log(zen::logging::Get("jupiter"))
, m_ServiceUrl(Options.ServiceUrl)
-, m_DdcNamespace(Options.DdcNamespace)
-, m_BlobStoreNamespace(Options.BlobStoreNamespace)
+, m_DefaultDdcNamespace(Options.DdcNamespace)
+, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
, m_ComputeCluster(Options.ComputeCluster)
, m_ConnectTimeout(Options.ConnectTimeout)
, m_Timeout(Options.Timeout)
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index a74d4d81d..3d9e6ea7b 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -191,8 +191,8 @@ public:
~CloudCacheClient();
CloudCacheAccessToken AcquireAccessToken();
- std::string_view DdcNamespace() const { return m_DdcNamespace; }
- std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
std::string_view ComputeCluster() const { return m_ComputeCluster; }
std::string_view ServiceUrl() const { return m_ServiceUrl; }
@@ -201,8 +201,8 @@ public:
private:
spdlog::logger& m_Log;
std::string m_ServiceUrl;
- std::string m_DdcNamespace;
- std::string m_BlobStoreNamespace;
+ std::string m_DefaultDdcNamespace;
+ std::string m_DefaultBlobStoreNamespace;
std::string m_ComputeCluster;
std::chrono::milliseconds m_ConnectTimeout{};
std::chrono::milliseconds m_Timeout{};
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 2b52b8efa..cbb32b13e 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -160,6 +160,24 @@ namespace detail {
}
}
+ std::string_view GetActualDdcNamespace(CloudCacheSession& Session, const std::string Namespace)
+ {
+ if (Namespace == ZenCacheStore::DefaultNamespace)
+ {
+ return Session.Client().DefaultDdcNamespace();
+ }
+ return Namespace;
+ }
+
+ std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, const std::string Namespace)
+ {
+ if (Namespace == ZenCacheStore::DefaultNamespace)
+ {
+ return Session.Client().DefaultBlobStoreNamespace();
+ }
+ return Namespace;
+ }
+
virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); }
virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); }
@@ -173,17 +191,16 @@ namespace detail {
CloudCacheSession Session(m_Client);
CloudCacheResult Result;
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+
if (m_UseLegacyDdc && Type == ZenContentType::kBinary)
{
- Result =
- Session.GetDerivedData(CacheKey.Namespace, CacheKey.Bucket, CacheKey.Hash); // Session.Client().DdcNamespace(),
+ std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace);
+ Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash);
}
else if (Type == ZenContentType::kCompressedBinary)
{
- Result = Session.GetRef(CacheKey.Namespace,
- CacheKey.Bucket,
- CacheKey.Hash,
- ZenContentType::kCbObject); // Session.Client().BlobStoreNamespace(),
+ Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
if (Result.Success)
{
@@ -194,24 +211,22 @@ namespace detail {
IoBuffer ContentBuffer;
int NumAttachments = 0;
- CacheRecord.IterateAttachments(
- [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult =
- Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash());
- Result.Bytes += AttachmentResult.Bytes;
- Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
- Result.ErrorCode = AttachmentResult.ErrorCode;
+ CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ Result.Bytes += AttachmentResult.Bytes;
+ Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+ Result.ErrorCode = AttachmentResult.ErrorCode;
- if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
- {
- Result.Response = AttachmentResult.Response;
- ++NumAttachments;
- }
- else
- {
- Result.Success = false;
- }
- });
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Result.Response = AttachmentResult.Response;
+ ++NumAttachments;
+ }
+ else
+ {
+ Result.Success = false;
+ }
+ });
if (NumAttachments != 1)
{
Result.Success = false;
@@ -222,10 +237,7 @@ namespace detail {
else
{
const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type;
- Result = Session.GetRef(CacheKey.Namespace,
- CacheKey.Bucket,
- CacheKey.Hash,
- AcceptType); // Session.Client().BlobStoreNamespace()
+ Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, AcceptType);
if (Result.Success && Type == ZenContentType::kCbPackage)
{
@@ -236,9 +248,8 @@ namespace detail {
{
CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
- CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult =
- Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash());
+ CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
Result.Bytes += AttachmentResult.Bytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
Result.ErrorCode = AttachmentResult.ErrorCode;
@@ -303,10 +314,9 @@ namespace detail {
if (!Result.Error)
{
- CloudCacheResult RefResult = Session.GetRef(CacheKey.Namespace,
- CacheKey.Bucket,
- CacheKey.Hash,
- ZenContentType::kCbObject); // Session.Client().BlobStoreNamespace()
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+ CloudCacheResult RefResult =
+ Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
AppendResult(RefResult, Result);
m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
@@ -317,9 +327,8 @@ namespace detail {
if (ValidationResult == CbValidateError::None)
{
Record = LoadCompactBinaryObject(RefResult.Response);
- Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) {
- CloudCacheResult BlobResult =
- Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), AttachmentHash.AsHash());
+ Record.IterateAttachments([&](CbFieldView AttachmentHash) {
+ CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
AppendResult(BlobResult, Result);
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
@@ -342,14 +351,15 @@ namespace detail {
return Result;
}
- virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
{
ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue");
try
{
CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), ValueContentId);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace);
+ const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -389,8 +399,9 @@ namespace detail {
CompressedBuffer Compressed;
if (!Result.Error)
{
- const CloudCacheResult BlobResult = Session.GetCompressedBlob(Session.Client().BlobStoreNamespace(), Request.ChunkId);
- Payload = BlobResult.Response;
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace);
+ const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId);
+ Payload = BlobResult.Response;
AppendResult(BlobResult, Result);
@@ -435,16 +446,14 @@ namespace detail {
CloudCacheResult Result;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace);
if (m_UseLegacyDdc)
{
- Result = Session.PutDerivedData(CacheRecord.Key.Namespace, // Session.Client().BlobStoreNamespace(),
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- RecordValue);
+ Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue);
}
else
{
- Result = Session.PutRef(CacheRecord.Key.Namespace, // Session.Client().BlobStoreNamespace(),
+ Result = Session.PutRef(BlobStoreNamespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
RecordValue,
@@ -547,7 +556,8 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace);
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
for (const IoHash& ValueContentId : ValueContentIds)
{
IoBuffer BlobBuffer;
@@ -559,7 +569,7 @@ namespace detail {
CloudCacheResult BlobResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
- BlobResult = Session.PutCompressedBlob(Session.Client().BlobStoreNamespace(), ValueContentId, BlobBuffer);
+ BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer);
}
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
@@ -580,11 +590,7 @@ namespace detail {
PutRefResult RefResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
{
- RefResult = Session.PutRef(Key.Namespace, // Seesion.Client().BlobStoreNamespace(),
- Key.Bucket,
- Key.Hash,
- ObjectBuffer,
- ZenContentType::kCbObject);
+ RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject);
}
m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
@@ -604,9 +610,8 @@ namespace detail {
return {.Reason = std::move(Reason), .Success = false};
}
- const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer);
- FinalizeRefResult FinalizeResult =
- Session.FinalizeRef(Key.Namespace, Key.Bucket, Key.Hash, RefHash); // Session.Client().BlobStoreNamespace(),
+ const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash);
m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
@@ -624,8 +629,7 @@ namespace detail {
return {.Reason = std::move(Reason), .Success = false};
}
- FinalizeResult =
- Session.FinalizeRef(Key.Namespace, Key.Bucket, Key.Hash, RefHash); // Session.Client().BlobStoreNamespace(),
+ FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash);
m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);