aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/admin/admin.cpp49
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp4
-rw-r--r--src/zenserver/config.cpp27
-rw-r--r--src/zenserver/objectstore/objectstore.cpp6
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.cpp113
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp12
-rw-r--r--src/zenserver/projectstore/httpprojectstore.h2
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp89
-rw-r--r--src/zenserver/projectstore/projectstore.cpp18
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp7
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp2
-rw-r--r--src/zenserver/upstream/jupiter.cpp662
-rw-r--r--src/zenserver/upstream/jupiter.h256
-rw-r--r--src/zenserver/upstream/upstream.h1
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp120
-rw-r--r--src/zenserver/upstream/upstreamcache.h10
-rw-r--r--src/zenserver/upstream/upstreamservice.cpp1
-rw-r--r--src/zenserver/upstream/zen.cpp2
-rw-r--r--src/zenserver/workspaces/httpworkspaces.cpp2
-rw-r--r--src/zenserver/zenserver.cpp15
20 files changed, 239 insertions, 1159 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 847ed5a50..2888f5450 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -21,6 +21,7 @@
#include <zenstore/gc.h>
#include <zenstore/cache/structuredcachestore.h>
+#include <zenutil/workerpools.h>
#include "config.h"
#include "projectstore/projectstore.h"
@@ -41,31 +42,43 @@ GetStatsForDirectory(std::filesystem::path Dir)
if (!std::filesystem::exists(Dir))
return {};
- FileSystemTraversal Traversal;
-
- struct StatsTraversal : public FileSystemTraversal::TreeVisitor
+ struct StatsTraversal : public GetDirectoryContentVisitor
{
- virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
+ virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override
{
- ZEN_UNUSED(Parent, File);
- ++TotalFileCount;
- TotalBytes += FileSize;
+ ZEN_UNUSED(RelativeRoot);
+
+ uint64_t FileCount = Content.FileNames.size();
+ uint64_t DirCount = Content.DirectoryNames.size();
+ uint64_t FilesSize = 0;
+ for (uint64_t FileSize : Content.FileSizes)
+ {
+ FilesSize += FileSize;
+ }
+ TotalBytes += FilesSize;
+ TotalFileCount += FileCount;
+ TotalDirCount += DirCount;
}
- virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override
+
+ std::atomic_uint64_t TotalBytes = 0;
+ std::atomic_uint64_t TotalFileCount = 0;
+ std::atomic_uint64_t TotalDirCount = 0;
+
+ DirStats GetStats()
{
- ++TotalDirCount;
- return true;
+ return {.FileCount = TotalFileCount.load(), .DirCount = TotalDirCount.load(), .ByteCount = TotalBytes.load()};
}
+ } DirTraverser;
- uint64_t TotalBytes = 0;
- uint64_t TotalFileCount = 0;
- uint64_t TotalDirCount = 0;
-
- DirStats GetStats() { return {.FileCount = TotalFileCount, .DirCount = TotalDirCount, .ByteCount = TotalBytes}; }
- };
+ Latch PendingWorkCount(1);
- StatsTraversal DirTraverser;
- Traversal.TraverseFileSystem(Dir, DirTraverser);
+ GetDirectoryContent(Dir,
+ DirectoryContentFlags::IncludeAllEntries | DirectoryContentFlags::IncludeFileSizes,
+ DirTraverser,
+ GetSmallWorkerPool(EWorkloadType::Burst),
+ PendingWorkCount);
+ PendingWorkCount.CountDown();
+ PendingWorkCount.Wait();
return DirTraverser.GetStats();
}
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 925c7b42d..b9a9ca380 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -18,15 +18,15 @@
#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/gc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/cacherequests.h>
#include <zenutil/cache/rpcrecording.h>
-#include <zenutil/packageformat.h>
+#include <zenutil/jupiter/jupiterclient.h>
#include <zenutil/workerpools.h>
-#include "upstream/jupiter.h"
#include "upstream/upstreamcache.h"
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 0108e8b9f..809092378 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -5,6 +5,7 @@
#include "config/luaconfig.h"
#include "diag/logging.h"
+#include <zencore/basicfile.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/crypto.h>
@@ -14,7 +15,6 @@
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zenhttp/zenhttp.h>
-#include <zenutil/basicfile.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/format.h>
@@ -63,7 +63,7 @@ ReadAllCentralManifests(const std::filesystem::path& SystemRoot)
std::vector<CbObject> Manifests;
DirectoryContent Content;
- GetDirectoryContent(SystemRoot / "States", DirectoryContent::IncludeFilesFlag, Content);
+ GetDirectoryContent(SystemRoot / "States", DirectoryContentFlags::IncludeFiles, Content);
for (std::filesystem::path& File : Content.Files)
{
@@ -400,25 +400,25 @@ ParseConfigFile(const std::filesystem::path& Path,
#endif
////// stats
- LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled);
+ LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled, "statsd"sv);
LuaOptions.AddOption("stats.host"sv, ServerOptions.StatsConfig.StatsdHost);
LuaOptions.AddOption("stats.port"sv, ServerOptions.StatsConfig.StatsdPort);
////// cache
LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled);
- LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log");
- LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log");
+ LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log"sv);
+ LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log"sv);
LuaOptions.AddOption("cache.memlayer.sizethreshold"sv,
ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold,
- "cache-memlayer-sizethreshold");
+ "cache-memlayer-sizethreshold"sv);
LuaOptions.AddOption("cache.memlayer.targetfootprint"sv,
ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes,
- "cache-memlayer-targetfootprint");
+ "cache-memlayer-targetfootprint"sv);
LuaOptions.AddOption("cache.memlayer.triminterval"sv,
ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds,
- "cache-memlayer-triminterval");
- LuaOptions.AddOption("cache.memlayer.maxage"sv, ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds, "cache-memlayer-maxage");
+ "cache-memlayer-triminterval"sv);
+ LuaOptions.AddOption("cache.memlayer.maxage"sv, ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds, "cache-memlayer-maxage"sv);
////// cache.upstream
LuaOptions.AddOption("cache.upstream.policy"sv, ServerOptions.UpstreamCacheConfig.CachePolicy, "upstream-cache-policy"sv);
@@ -461,6 +461,7 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("cache.upstream.zen.dns"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Dns);
LuaOptions.AddOption("cache.upstream.zen.url"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Urls);
+ ////// gc
LuaOptions.AddOption("gc.enabled"sv, ServerOptions.GcConfig.Enabled, "gc-enabled"sv);
LuaOptions.AddOption("gc.v2"sv, ServerOptions.GcConfig.UseGCV2, "gc-v2"sv);
@@ -487,22 +488,24 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("gc.attachment.passes"sv, ServerOptions.GcConfig.AttachmentPassCount, "gc-attachment-passes"sv);
LuaOptions.AddOption("gc.validation"sv, ServerOptions.GcConfig.EnableValidation, "gc-validation");
- ////// gc
LuaOptions.AddOption("gc.cache.maxdurationseconds"sv, ServerOptions.GcConfig.Cache.MaxDurationSeconds, "gc-cache-duration-seconds"sv);
+ LuaOptions.AddOption("gc.projectstore.duration.seconds"sv,
+ ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds,
+ "gc-projectstore-duration-seconds");
////// security
LuaOptions.AddOption("security.encryptionaeskey"sv, ServerOptions.EncryptionKey, "encryption-aes-key"sv);
LuaOptions.AddOption("security.encryptionaesiv"sv, ServerOptions.EncryptionIV, "encryption-aes-iv"sv);
LuaOptions.AddOption("security.openidproviders"sv, ServerOptions.AuthConfig);
- LuaOptions.Parse(Path, CmdLineResult);
-
////// workspaces
LuaOptions.AddOption("workspaces.enabled"sv, ServerOptions.WorksSpacesConfig.Enabled, "workspaces-enabled"sv);
LuaOptions.AddOption("workspaces.allowconfigchanges"sv,
ServerOptions.WorksSpacesConfig.AllowConfigurationChanges,
"workspaces-allow-changes"sv);
+ LuaOptions.Parse(Path, CmdLineResult);
+
// These have special command line processing so we make sure we export them if they were configured on command line
if (!ServerOptions.AuthConfig.OpenIdProviders.empty())
{
diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp
index e614b256b..b0212ab07 100644
--- a/src/zenserver/objectstore/objectstore.cpp
+++ b/src/zenserver/objectstore/objectstore.cpp
@@ -3,6 +3,7 @@
#include <objectstore/objectstore.h>
#include <zencore/base64.h>
+#include <zencore/basicfile.h>
#include <zencore/compactbinaryvalue.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -12,7 +13,6 @@
#include "zencore/compactbinarybuilder.h"
#include "zenhttp/httpcommon.h"
#include "zenhttp/httpserver.h"
-#include "zenutil/basicfile.h"
#include <filesystem>
#include <thread>
@@ -376,7 +376,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s
Writer.BeginArray("Contents"sv);
}
- void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize) override
+ void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override
{
const fs::path FullPath = Parent / fs::path(File);
fs::path RelativePath = fs::relative(FullPath, BucketPath);
@@ -390,7 +390,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s
Writer.EndObject();
}
- bool VisitDirectory(const std::filesystem::path&, const path_view&) override { return false; }
+ bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return false; }
CbObject GetResult()
{
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
index 6d0d51a60..302b81729 100644
--- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
@@ -6,8 +6,10 @@
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
-#include <upstream/jupiter.h>
-#include <zenhttp/auth/authmgr.h>
+#include <zenhttp/httpclientauth.h>
+
+#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/jupiter/jupitersession.h>
namespace zen {
@@ -18,7 +20,7 @@ static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
class BuildsRemoteStore : public RemoteProjectStore
{
public:
- BuildsRemoteStore(Ref<CloudCacheClient>&& CloudClient,
+ BuildsRemoteStore(Ref<JupiterClient>&& InJupiterClient,
std::string_view Namespace,
std::string_view Bucket,
const Oid& BuildId,
@@ -26,7 +28,7 @@ public:
bool ForceDisableBlocks,
bool ForceDisableTempBlocks,
const std::filesystem::path& TempFilePath)
- : m_CloudClient(std::move(CloudClient))
+ : m_JupiterClient(std::move(InJupiterClient))
, m_Namespace(Namespace)
, m_Bucket(Bucket)
, m_BuildId(BuildId)
@@ -50,7 +52,7 @@ public:
.UseTempBlockFiles = m_UseTempBlocks,
.AllowChunking = true,
.ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId),
- .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)};
+ .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)};
}
virtual Stats GetStats() const override
@@ -68,18 +70,18 @@ public:
{
ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
- CloudCacheSession Session(m_CloudClient.Get());
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
IoBuffer Payload = m_MetaData;
Payload.SetContentType(ZenContentType::kCbObject);
- CloudCacheResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload);
+ JupiterResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload);
AddStats(PutResult);
CreateContainerResult Result{ConvertResult(PutResult)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -92,7 +94,7 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- CloudCacheSession Session(m_CloudClient.Get());
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
PutBuildPartResult PutResult =
Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload);
AddStats(PutResult);
@@ -101,7 +103,7 @@ public:
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -115,9 +117,9 @@ public:
virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- CloudCacheSession Session(m_CloudClient.Get());
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
- CloudCacheResult PutResult =
+ JupiterResult PutResult =
Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload);
AddStats(PutResult);
@@ -125,7 +127,7 @@ public:
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -176,14 +178,14 @@ public:
IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer();
MetaPayload.SetContentType(ZenContentType::kCbObject);
- CloudCacheResult PutMetaResult =
+ JupiterResult PutMetaResult =
Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload);
AddStats(PutMetaResult);
RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult);
if (MetaDataResult.ErrorCode)
{
ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -215,7 +217,7 @@ public:
ZEN_UNUSED(RawHash);
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- CloudCacheSession Session(m_CloudClient.Get());
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
FinalizeBuildPartResult FinalizeRefResult =
Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash);
AddStats(FinalizeRefResult);
@@ -224,7 +226,7 @@ public:
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -233,14 +235,14 @@ public:
}
else if (Result.Needs.empty())
{
- CloudCacheResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId);
+ JupiterResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId);
AddStats(FinalizeBuildResult);
FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds;
Result = {ConvertResult(FinalizeBuildResult)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -254,14 +256,14 @@ public:
{
ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
- CloudCacheSession Session(m_CloudClient.Get());
- CloudCacheResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
+ JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId);
AddStats(GetBuildResult);
LoadContainerResult Result{ConvertResult(GetBuildResult)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -273,7 +275,7 @@ public:
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv,
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId);
@@ -284,7 +286,7 @@ public:
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv,
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId);
@@ -295,7 +297,7 @@ public:
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -303,7 +305,7 @@ public:
return Result;
}
- CloudCacheResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
+ JupiterResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
AddStats(GetBuildPartResult);
Result = {ConvertResult(GetBuildResult)};
Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds;
@@ -311,7 +313,7 @@ public:
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -325,7 +327,7 @@ public:
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv,
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -339,15 +341,15 @@ public:
virtual GetKnownBlocksResult GetKnownBlocks() override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- CloudCacheSession Session(m_CloudClient.Get());
- CloudCacheResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
+ JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
AddStats(FindResult);
GetKnownBlocksResult Result{ConvertResult(FindResult)};
if (Result.ErrorCode)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -360,7 +362,7 @@ public:
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv,
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -392,15 +394,15 @@ public:
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- CloudCacheSession Session(m_CloudClient.Get());
- CloudCacheResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
+ JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath);
AddStats(GetResult);
LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
if (GetResult.ErrorCode)
{
Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
@@ -429,7 +431,7 @@ public:
}
private:
- void AddStats(const CloudCacheResult& Result)
+ void AddStats(const JupiterResult& Result)
{
m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes));
m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes));
@@ -445,7 +447,7 @@ private:
m_RequestCount.fetch_add(1);
}
- static Result ConvertResult(const CloudCacheResult& Response)
+ static Result ConvertResult(const JupiterResult& Response)
{
std::string Text;
int32_t ErrorCode = 0;
@@ -482,7 +484,7 @@ private:
return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
}
- Ref<CloudCacheClient> m_CloudClient;
+ Ref<JupiterClient> m_JupiterClient;
const std::string m_Namespace;
const std::string m_Bucket;
const Oid m_BuildId;
@@ -510,44 +512,35 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file
// Assume https URL
Url = fmt::format("https://{}"sv, Url);
}
- CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv,
- .ServiceUrl = Url,
- .ConnectTimeout = std::chrono::milliseconds(2000),
- .Timeout = std::chrono::milliseconds(1800000),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 4};
+ JupiterClientOptions ClientOptions{.Name = "Remote store"sv,
+ .ServiceUrl = Url,
+ .ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(1800000),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 4};
// 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
// 2) Access token as parameter in request
// 3) Environment variable (different win vs linux/mac)
// 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+ std::function<HttpClientAccessToken()> TokenProvider;
if (!Options.OpenIdProvider.empty())
{
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() {
- AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
+ TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider);
}
else if (!Options.AccessToken.empty())
{
- TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() {
- return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()};
- });
+ TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken);
}
else
{
- TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() {
- AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default");
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
+ TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager);
}
- Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));
+ Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider)));
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(CloudClient),
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(Client),
Options.Namespace,
Options.Bucket,
Options.BuildId,
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index eb6407e1f..0b8e5f13b 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -1061,12 +1061,8 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
bool IsValid = true;
std::vector<IoHash> MissingChunks;
- std::vector<IoHash> ReferencedChunks;
CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer {
- // We want to add all chunks here so we can properly clear them from the 'prep' call where we retained them earlier
- ReferencedChunks.push_back(Hash);
-
if (m_CidStore.ContainsChunk(Hash))
{
// Return null attachment as we already have it, no point in reading it and storing it again
@@ -1155,6 +1151,9 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified");
}
+ std::vector<IoHash> ReferencedChunks;
+ Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); });
+
// Write core to oplog
size_t AttachmentCount = Package.GetAttachments().size();
@@ -1168,7 +1167,10 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
m_ProjectStats.ChunkWriteCount += AttachmentCount;
// Once we stored the op, we no longer need to retain any chunks this op references
- FoundLog->RemovePendingChunkReferences(ReferencedChunks);
+ if (!ReferencedChunks.empty())
+ {
+ FoundLog->RemovePendingChunkReferences(ReferencedChunks);
+ }
m_ProjectStats.OpWriteCount++;
ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString());
diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h
index 13810bd66..8e74c57a5 100644
--- a/src/zenserver/projectstore/httpprojectstore.h
+++ b/src/zenserver/projectstore/httpprojectstore.h
@@ -3,13 +3,13 @@
#pragma once
#include <zencore/stats.h>
-#include <zenhttp/auth/authmgr.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
#include <zenstore/cidstore.h>
namespace zen {
+class AuthMgr;
class ProjectStore;
//////////////////////////////////////////////////////////////////////////
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index f4fe578ff..e906127ff 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -5,8 +5,10 @@
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
-#include <upstream/jupiter.h>
-#include <zenhttp/auth/authmgr.h>
+#include <zenhttp/httpclientauth.h>
+
+#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/jupiter/jupitersession.h>
namespace zen {
@@ -15,7 +17,7 @@ using namespace std::literals;
class JupiterRemoteStore : public RemoteProjectStore
{
public:
- JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient,
+ JupiterRemoteStore(Ref<JupiterClient>&& InJupiterClient,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& Key,
@@ -23,7 +25,7 @@ public:
bool ForceDisableBlocks,
bool ForceDisableTempBlocks,
const std::filesystem::path& TempFilePath)
- : m_CloudClient(std::move(CloudClient))
+ : m_JupiterClient(std::move(InJupiterClient))
, m_Namespace(Namespace)
, m_Bucket(Bucket)
, m_Key(Key)
@@ -47,7 +49,7 @@ public:
.AllowChunking = true,
.ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
.Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv,
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_Key,
@@ -73,15 +75,15 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
- CloudCacheSession Session(m_CloudClient.Get());
- PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
+ PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
AddStats(PutResult);
SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_Key,
@@ -92,15 +94,15 @@ public:
virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override
{
- CloudCacheSession Session(m_CloudClient.Get());
- CloudCacheResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
+ JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
AddStats(PutResult);
SaveAttachmentResult Result{ConvertResult(PutResult)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
RawHash,
Result.Reason);
@@ -125,7 +127,7 @@ public:
virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
{
- CloudCacheSession Session(m_CloudClient.Get());
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
AddStats(FinalizeRefResult);
@@ -133,7 +135,7 @@ public:
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_Key,
@@ -162,8 +164,8 @@ public:
{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}};
}
- CloudCacheSession Session(m_CloudClient.Get());
- CloudCacheExistsResult ExistsResult =
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
+ JupiterExistsResult ExistsResult =
Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end()));
AddStats(ExistsResult);
@@ -172,7 +174,7 @@ public:
return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode,
.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds,
.Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
ExistsResult.Reason)}};
}
@@ -201,15 +203,15 @@ public:
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
- CloudCacheSession Session(m_CloudClient.Get());
- CloudCacheResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
+ JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
AddStats(GetResult);
LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
if (GetResult.ErrorCode)
{
Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
RawHash,
Result.Reason);
@@ -237,14 +239,14 @@ public:
private:
LoadContainerResult LoadContainer(const IoHash& Key)
{
- CloudCacheSession Session(m_CloudClient.Get());
- CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
+ JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject);
AddStats(GetResult);
if (GetResult.ErrorCode || !GetResult.Success)
{
LoadContainerResult Result{ConvertResult(GetResult)};
Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
Key,
@@ -259,7 +261,7 @@ private:
RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
.ElapsedSeconds = GetResult.ElapsedSeconds,
.Reason = fmt::format("The ref {}/{}/{}/{} is not formatted as a compact binary object"sv,
- m_CloudClient->ServiceUrl(),
+ m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
Key)},
@@ -268,7 +270,7 @@ private:
return LoadContainerResult{ConvertResult(GetResult), std::move(ContainerObject)};
}
- void AddStats(const CloudCacheResult& Result)
+ void AddStats(const JupiterResult& Result)
{
m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes));
m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes));
@@ -284,7 +286,7 @@ private:
m_RequestCount.fetch_add(1);
}
- static Result ConvertResult(const CloudCacheResult& Response)
+ static Result ConvertResult(const JupiterResult& Response)
{
std::string Text;
int32_t ErrorCode = 0;
@@ -321,7 +323,7 @@ private:
return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
}
- Ref<CloudCacheClient> m_CloudClient;
+ Ref<JupiterClient> m_JupiterClient;
const std::string m_Namespace;
const std::string m_Bucket;
const IoHash m_Key;
@@ -348,44 +350,35 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
// Assume https URL
Url = fmt::format("https://{}"sv, Url);
}
- CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv,
- .ServiceUrl = Url,
- .ConnectTimeout = std::chrono::milliseconds(2000),
- .Timeout = std::chrono::milliseconds(1800000),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 4};
+ JupiterClientOptions ClientOptions{.Name = "Remote store"sv,
+ .ServiceUrl = Url,
+ .ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(1800000),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 4};
// 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
// 2) Access token as parameter in request
// 3) Environment variable (different win vs linux/mac)
// 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+ std::function<HttpClientAccessToken()> TokenProvider;
if (!Options.OpenIdProvider.empty())
{
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() {
- AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
+ TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider);
}
else if (!Options.AccessToken.empty())
{
- TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() {
- return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()};
- });
+ TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken);
}
else
{
- TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() {
- AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default");
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
+ TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager);
}
- Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));
+ Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider)));
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(CloudClient),
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(Client),
Options.Namespace,
Options.Bucket,
Options.Key,
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index d39d78cb9..46a236af9 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -16,12 +16,12 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/rpcrecording.h>
#include <zenutil/openprocesscache.h>
-#include <zenutil/packageformat.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -3506,7 +3506,7 @@ ProjectStore::Project::ScanForOplogs() const
if (Project::Exists(m_OplogStoragePath))
{
DirectoryContent DirContent;
- GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent);
+ GetDirectoryContent(m_OplogStoragePath, DirectoryContentFlags::IncludeDirs, DirContent);
Oplogs.reserve(DirContent.Directories.size());
for (const std::filesystem::path& DirPath : DirContent.Directories)
{
@@ -3859,7 +3859,7 @@ ProjectStore::DiscoverProjects()
}
DirectoryContent DirContent;
- GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+ GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent);
for (const std::filesystem::path& DirPath : DirContent.Directories)
{
@@ -3965,7 +3965,7 @@ ProjectStore::StorageSize() const
if (std::filesystem::exists(m_ProjectBasePath))
{
DirectoryContent ProjectsFolderContent;
- GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, ProjectsFolderContent);
+ GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, ProjectsFolderContent);
for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories)
{
@@ -3974,7 +3974,7 @@ ProjectStore::StorageSize() const
{
Result.DiskSize += Project::TotalSize(ProjectBasePath);
DirectoryContent DirContent;
- GetDirectoryContent(ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+ GetDirectoryContent(ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent);
for (const std::filesystem::path& OplogBasePath : DirContent.Directories)
{
Result.DiskSize += Oplog::TotalSize(OplogBasePath);
@@ -5433,7 +5433,11 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
Project->TouchProject();
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
+ std::string_view Method = Cb["method"sv].AsString();
+
+ bool VerifyPathOnDisk = Method != "getchunks"sv;
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk);
if (!Oplog)
{
HttpReq.WriteResponse(HttpResponseCode::NotFound,
@@ -5443,8 +5447,6 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
Project->TouchOplog(OplogId);
- std::string_view Method = Cb["method"sv].AsString();
-
if (Method == "import"sv)
{
if (!AreDiskWritesAllowed())
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 216b1c4dd..0589fdc5f 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -266,7 +266,10 @@ namespace remotestore_impl {
AppendBatch();
}
- ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0);
+ if (OpCount > 0)
+ {
+ ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0);
+ }
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
}
@@ -1685,7 +1688,7 @@ BuildContainer(CidStore& ChunkStore,
IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
if (RawData)
{
- if (RawData.GetSize() > ChunkFileSizeLimit)
+ if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit)
{
IoBufferFileReference FileRef;
(void)RawData.GetFileReference(FileRef);
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index 566d0d4b2..42519b108 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -8,7 +8,7 @@
#include <zencore/fmtutils.h>
#include <zencore/stream.h>
#include <zenhttp/httpclient.h>
-#include <zenutil/packageformat.h>
+#include <zenhttp/packageformat.h>
namespace zen {
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
deleted file mode 100644
index 2ae977f00..000000000
--- a/src/zenserver/upstream/jupiter.cpp
+++ /dev/null
@@ -1,662 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "jupiter.h"
-
-#include "diag/logging.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/compositebuffer.h>
-#include <zencore/fmtutils.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/scopeguard.h>
-#include <zencore/thread.h>
-#include <zencore/trace.h>
-#include <zenhttp/formatters.h>
-#include <zenutil/basicfile.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <fmt/format.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#if ZEN_PLATFORM_WINDOWS
-# pragma comment(lib, "Crypt32.lib")
-# pragma comment(lib, "Wldap32.lib")
-#endif
-
-#include <json11.hpp>
-
-using namespace std::literals;
-
-namespace zen {
-
-namespace detail {
- CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
- {
- if (Response.Error)
- {
- return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = Response.Error.value().ErrorCode,
- .Reason = Response.ErrorMessage(ErrorPrefix),
- .Success = false};
- }
- if (!Response.IsSuccess())
- {
- return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = static_cast<int32_t>(Response.StatusCode),
- .Reason = Response.ErrorMessage(ErrorPrefix),
- .Success = false};
- }
- return {.Response = Response.ResponsePayload,
- .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = 0,
- .Success = true};
- }
-} // namespace detail
-
-CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
-{
-}
-
-CloudCacheSession::~CloudCacheSession()
-{
-}
-
-CloudCacheResult
-CloudCacheSession::Authenticate()
-{
- bool OK = m_CacheClient->m_HttpClient.Authenticate();
- return {.Success = OK};
-}
-
-CloudCacheResult
-CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
-{
- ZEN_TRACE_CPU("JupiterClient::GetRef");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- {HttpClient::Accept(RefType)});
-
- return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetBlob");
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()),
- {HttpClient::Accept(ZenContentType::kBinary)});
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
-{
- ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
- TempFolderPath,
- {HttpClient::Accept(ZenContentType::kCompressedBinary)});
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::GetInlineBlob(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- IoHash& OutPayloadHash,
- std::filesystem::path TempFolderPath)
-{
- ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- TempFolderPath,
- {{"Accept", "application/x-jupiter-inline"}});
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
-
- if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
- {
- const std::string& PayloadHashHeader = It->second;
- if (PayloadHashHeader.length() == IoHash::StringLength)
- {
- OutPayloadHash = IoHash::FromHexString(PayloadHashHeader);
- }
- }
-
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetObject");
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
- {HttpClient::Accept(ZenContentType::kCbObject)});
-
- return detail::ConvertResponse(Response);
-}
-
-PutRefResult
-CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
-{
- ZEN_TRACE_CPU("JupiterClient::PutRef");
-
- Ref.SetContentType(RefType);
-
- IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- Ref,
- {{"X-Jupiter-IoHash", Hash.ToHexString()}});
-
- PutRefResult Result = {detail::ConvertResponse(Response)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- Result.RawHash = Hash;
- }
- return Result;
-}
-
-FinalizeRefResult
-CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
-{
- ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
- fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
- {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
-
- FinalizeRefResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
-{
- ZEN_TRACE_CPU("JupiterClient::PutBlob");
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
-{
- ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
-
- Blob.SetContentType(ZenContentType::kCompressedBinary);
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
-{
- ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
- Payload,
- ZenContentType::kCompressedBinary);
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
-{
- ZEN_TRACE_CPU("JupiterClient::PutObject");
-
- Object.SetContentType(ZenContentType::kCbObject);
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::RefExists");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
-
- return detail::ConvertResponse(Response);
-}
-
-GetObjectReferencesResult
-CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
- {HttpClient::Accept(ZenContentType::kCbObject)});
-
- GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- const CbObject ReferencesResponse = Response.AsObject();
- for (auto& Item : ReferencesResponse["references"sv])
- {
- Result.References.insert(Item.AsHash());
- }
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "blobs"sv, Key);
-}
-
-CloudCacheResult
-CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
-}
-
-CloudCacheResult
-CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "objects"sv, Key);
-}
-
-CloudCacheExistsResult
-CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "blobs"sv, Keys);
-}
-
-CloudCacheExistsResult
-CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
-}
-
-CloudCacheExistsResult
-CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "objects"sv, Keys);
-}
-
-std::vector<IoHash>
-CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
-{
- // ExtendableStringBuilder<256> Uri;
- // Uri << m_CacheClient->ServiceUrl();
- // Uri << "/api/v1/s/" << Namespace;
-
- ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
-
- return {};
-}
-
-CloudCacheResult
-CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheExistsResult
-CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
-{
- ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
-
- ExtendableStringBuilder<256> Body;
- Body << "[";
- for (const auto& Key : Keys)
- {
- Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
- }
- Body << "]";
- IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
- Payload.SetContentType(ZenContentType::kJSON);
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace),
- Payload,
- {HttpClient::Accept(ZenContentType::kCbObject)});
-
- CloudCacheExistsResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- const CbObject ExistsResponse = Response.AsObject();
- for (auto& Item : ExistsResponse["needs"sv])
- {
- Result.Needs.insert(Item.AsHash());
- }
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload);
- return detail::ConvertResponse(Response, "CloudCacheSession::PutBuild"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "CloudCacheSession::GetBuild"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
-{
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId));
- return detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuild"sv);
-}
-
-PutBuildPartResult
-CloudCacheSession::PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
-
- IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName),
- Payload,
- {{"X-Jupiter-IoHash", Hash.ToHexString()}});
-
- PutBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::PutBuildPart"sv)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- Result.RawHash = Hash;
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
-{
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildPart"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- Payload,
- ContentType);
- return detail::ConvertResponse(Response, "CloudCacheSession::PutBuildBlob"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Download(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- TempFolderPath);
- return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildBlob"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::PutBlockMetadata(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- Payload);
- return detail::ConvertResponse(Response, "CloudCacheSession::PutBlockMetadata"sv);
-}
-
-FinalizeBuildPartResult
-CloudCacheSession::FinalizeBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& RawHash)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()),
- HttpClient::Accept(ZenContentType::kCbObject));
-
- FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuildPart"sv)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "CloudCacheSession::FindBlocks"sv);
-}
-
-/**
- * An access token provider that holds a token that will never change.
- */
-class StaticTokenProvider final : public CloudCacheTokenProvider
-{
-public:
- StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {}
-
- virtual ~StaticTokenProvider() = default;
-
- virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; }
-
-private:
- CloudCacheAccessToken m_Token;
-};
-
-std::unique_ptr<CloudCacheTokenProvider>
-CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token)
-{
- return std::make_unique<StaticTokenProvider>(std::move(Token));
-}
-
-class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider
-{
-public:
- OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params)
- {
- m_Url = std::string(Params.Url);
- m_ClientId = std::string(Params.ClientId);
- m_ClientSecret = std::string(Params.ClientSecret);
- }
-
- virtual ~OAuthClientCredentialsTokenProvider() = default;
-
- virtual CloudCacheAccessToken AcquireAccessToken() final override
- {
- using namespace std::chrono;
-
- std::string Body =
- fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret);
-
- cpr::Response Response =
- cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)});
-
- if (Response.error || Response.status_code != 200)
- {
- return {};
- }
-
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.text, JsonError);
-
- if (JsonError.empty() == false)
- {
- return {};
- }
-
- std::string Token = Json["access_token"].string_value();
- int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value());
- CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds);
-
- return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime};
- }
-
-private:
- std::string m_Url;
- std::string m_ClientId;
- std::string m_ClientSecret;
-};
-
-std::unique_ptr<CloudCacheTokenProvider>
-CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params)
-{
- return std::make_unique<OAuthClientCredentialsTokenProvider>(Params);
-}
-
-class CallbackTokenProvider final : public CloudCacheTokenProvider
-{
-public:
- CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {}
-
- virtual ~CallbackTokenProvider() = default;
-
- virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); }
-
-private:
- std::function<CloudCacheAccessToken()> m_Callback;
-};
-
-std::unique_ptr<CloudCacheTokenProvider>
-CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback)
-{
- return std::make_unique<CallbackTokenProvider>(std::move(Callback));
-}
-
-static std::optional<std::function<HttpClientAccessToken()>>
-GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider)
-{
- if (TokenProvider == nullptr)
- {
- return {};
- }
- auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken {
- CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken();
- return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime};
- };
- return ProviderFunc;
-}
-
-CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
-: m_Log(zen::logging::Get("jupiter"))
-, m_DefaultDdcNamespace(Options.DdcNamespace)
-, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
-, m_ComputeCluster(Options.ComputeCluster)
-, m_TokenProvider(std::move(TokenProvider))
-, m_HttpClient(Options.ServiceUrl,
- HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
- .Timeout = Options.Timeout,
- .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = Options.AllowResume,
- .RetryCount = Options.RetryCount})
-{
- ZEN_ASSERT(m_TokenProvider.get() != nullptr);
-}
-
-CloudCacheClient::~CloudCacheClient()
-{
-}
-
-} // namespace zen
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h
deleted file mode 100644
index 50e4ad68a..000000000
--- a/src/zenserver/upstream/jupiter.h
+++ /dev/null
@@ -1,256 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zenbase/refcount.h>
-#include <zencore/iohash.h>
-#include <zencore/logging.h>
-#include <zencore/thread.h>
-#include <zenhttp/httpclient.h>
-#include <zenhttp/httpserver.h>
-
-#include <atomic>
-#include <chrono>
-#include <list>
-#include <memory>
-#include <set>
-#include <vector>
-
-struct ZenCacheValue;
-
-namespace cpr {
-class Session;
-}
-
-namespace zen {
-
-class CbObjectView;
-class CloudCacheClient;
-class IoBuffer;
-struct IoHash;
-
-/**
- * Cached access token, for use with `Authorization:` header
- */
-struct CloudCacheAccessToken
-{
- using Clock = std::chrono::system_clock;
- using TimePoint = Clock::time_point;
-
- static constexpr int64_t ExpireMarginInSeconds = 30;
-
- std::string Value;
- TimePoint ExpireTime;
-
- bool IsValid() const
- {
- return Value.empty() == false &&
- ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count();
- }
-};
-
-struct CloudCacheResult
-{
- IoBuffer Response;
- uint64_t SentBytes{};
- uint64_t ReceivedBytes{};
- double ElapsedSeconds{};
- int32_t ErrorCode{};
- std::string Reason;
- bool Success = false;
-};
-
-struct PutRefResult : CloudCacheResult
-{
- std::vector<IoHash> Needs;
- IoHash RawHash;
-};
-
-struct FinalizeRefResult : CloudCacheResult
-{
- std::vector<IoHash> Needs;
-};
-
-struct CloudCacheExistsResult : CloudCacheResult
-{
- std::set<IoHash> Needs;
-};
-
-struct GetObjectReferencesResult : CloudCacheResult
-{
- std::set<IoHash> References;
-};
-
-struct PutBuildPartResult : CloudCacheResult
-{
- std::vector<IoHash> Needs;
- IoHash RawHash;
-};
-
-struct FinalizeBuildPartResult : CloudCacheResult
-{
- std::vector<IoHash> Needs;
-};
-
-/**
- * Context for performing Jupiter operations
- *
- * Maintains an HTTP connection so that subsequent operations don't need to go
- * through the whole connection setup process
- *
- */
-class CloudCacheSession
-{
-public:
- CloudCacheSession(CloudCacheClient* CacheClient);
- ~CloudCacheSession();
-
- CloudCacheResult Authenticate();
-
- CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
- CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
- CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult GetInlineBlob(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- IoHash& OutPayloadHash,
- std::filesystem::path TempFolderPath = {});
-
- PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
- CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
- CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
- CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
- CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
-
- FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
-
- CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
-
- GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
-
- CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key);
-
- CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
- CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
- CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
-
- std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
-
- CloudCacheResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
- CloudCacheResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- CloudCacheResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- PutBuildPartResult PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload);
- CloudCacheResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
- CloudCacheResult PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload);
- CloudCacheResult GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath);
- CloudCacheResult PutBlockMetadata(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- const IoBuffer& Payload);
- FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& RawHash);
- CloudCacheResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
-
- CloudCacheClient& Client() { return *m_CacheClient; };
-
-private:
- inline LoggerRef Log() { return m_Log; }
-
- CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
-
- CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
-
- LoggerRef m_Log;
- RefPtr<CloudCacheClient> m_CacheClient;
-};
-
-/**
- * Access token provider interface
- */
-class CloudCacheTokenProvider
-{
-public:
- virtual ~CloudCacheTokenProvider() = default;
-
- virtual CloudCacheAccessToken AcquireAccessToken() = 0;
-
- static std::unique_ptr<CloudCacheTokenProvider> CreateFromStaticToken(CloudCacheAccessToken Token);
-
- struct OAuthClientCredentialsParams
- {
- std::string_view Url;
- std::string_view ClientId;
- std::string_view ClientSecret;
- };
-
- static std::unique_ptr<CloudCacheTokenProvider> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params);
-
- static std::unique_ptr<CloudCacheTokenProvider> CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback);
-};
-
-struct CloudCacheClientOptions
-{
- std::string_view Name;
- std::string_view ServiceUrl;
- std::string_view DdcNamespace;
- std::string_view BlobStoreNamespace;
- std::string_view ComputeCluster;
- std::chrono::milliseconds ConnectTimeout{5000};
- std::chrono::milliseconds Timeout{};
- bool AssumeHttp2 = false;
- bool AllowResume = false;
- uint8_t RetryCount = 0;
-};
-
-/**
- * Jupiter upstream cache client
- */
-class CloudCacheClient : public RefCounted
-{
-public:
- CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider);
- ~CloudCacheClient();
-
- std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
- std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
- std::string_view ComputeCluster() const { return m_ComputeCluster; }
- std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
-
- LoggerRef Logger() { return m_Log; }
-
-private:
- LoggerRef m_Log;
- const std::string m_DefaultDdcNamespace;
- const std::string m_DefaultBlobStoreNamespace;
- const std::string m_ComputeCluster;
- const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
- HttpClient m_HttpClient;
-
- friend class CloudCacheSession;
-};
-
-} // namespace zen
diff --git a/src/zenserver/upstream/upstream.h b/src/zenserver/upstream/upstream.h
index a57301206..4d45687fc 100644
--- a/src/zenserver/upstream/upstream.h
+++ b/src/zenserver/upstream/upstream.h
@@ -2,7 +2,6 @@
#pragma once
-#include <upstream/jupiter.h>
#include <upstream/upstreamcache.h>
#include <upstream/upstreamservice.h>
#include <upstream/zen.h>
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index ab8fe8704..e438a840a 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1,7 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
#include "upstreamcache.h"
-#include "jupiter.h"
#include "zen.h"
#include <zencore/blockingqueue.h>
@@ -15,11 +14,15 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/auth/authmgr.h>
-#include <zenstore/cidstore.h>
-#include <zenutil/packageformat.h>
+#include <zenhttp/httpclientauth.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/cache/structuredcachestore.h>
+#include <zenstore/cidstore.h>
+
+#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/jupiter/jupitersession.h>
+
#include "cache/httpstructuredcache.h"
#include "diag/logging.h"
@@ -85,7 +88,7 @@ namespace detail {
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
- JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
+ JupiterUpstreamEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
: m_AuthMgr(Mgr)
, m_Log(zen::logging::Get("upstream"))
{
@@ -93,30 +96,27 @@ namespace detail {
m_Info.Name = Options.Name;
m_Info.Url = Options.ServiceUrl;
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+ std::function<HttpClientAccessToken()> TokenProvider;
if (AuthConfig.OAuthUrl.empty() == false)
{
- TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials(
+ TokenProvider = httpclientauth::CreateFromOAuthClientCredentials(
{.Url = AuthConfig.OAuthUrl, .ClientId = AuthConfig.OAuthClientId, .ClientSecret = AuthConfig.OAuthClientSecret});
}
- else if (AuthConfig.OpenIdProvider.empty() == false)
+ else if (!AuthConfig.OpenIdProvider.empty())
{
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(AuthConfig.OpenIdProvider)]() {
- AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
+ TokenProvider = httpclientauth::CreateFromOpenIdProvider(m_AuthMgr, AuthConfig.OpenIdProvider);
+ }
+ else if (!AuthConfig.AccessToken.empty())
+ {
+ TokenProvider = httpclientauth::CreateFromStaticToken(AuthConfig.AccessToken);
}
else
{
- CloudCacheAccessToken AccessToken{.Value = std::string(AuthConfig.AccessToken),
- .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
-
- TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
+ TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(m_AuthMgr);
}
- m_Client = new CloudCacheClient(Options, std::move(TokenProvider));
+ m_Client = new JupiterClient(Options, std::move(TokenProvider));
}
virtual ~JupiterUpstreamEndpoint() {}
@@ -134,8 +134,8 @@ namespace detail {
return {.State = UpstreamEndpointState::kOk};
}
- CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.Authenticate();
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ const JupiterResult Result = Session.Authenticate();
if (Result.Success)
{
@@ -160,20 +160,11 @@ namespace detail {
}
}
- std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace)
- {
- if (Namespace == ZenCacheStore::DefaultNamespace)
- {
- return Session.Client().DefaultDdcNamespace();
- }
- return Namespace;
- }
-
- std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace)
+ std::string_view GetActualBlobStoreNamespace(std::string_view Namespace)
{
if (Namespace == ZenCacheStore::DefaultNamespace)
{
- return Session.Client().DefaultBlobStoreNamespace();
+ return m_Client->DefaultBlobStoreNamespace();
}
return Namespace;
}
@@ -190,10 +181,10 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
- CloudCacheResult Result;
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ JupiterResult Result;
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
if (Type == ZenContentType::kCompressedBinary)
{
@@ -209,7 +200,7 @@ namespace detail {
int NumAttachments = 0;
CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
Result.ReceivedBytes += AttachmentResult.ReceivedBytes;
Result.SentBytes += AttachmentResult.SentBytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
@@ -249,7 +240,7 @@ namespace detail {
CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
Result.ReceivedBytes += AttachmentResult.ReceivedBytes;
Result.SentBytes += AttachmentResult.SentBytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
@@ -310,7 +301,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheRecords");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheKeyRequest* Request : Requests)
@@ -322,9 +313,8 @@ namespace detail {
double ElapsedSeconds = 0.0;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- CloudCacheResult RefResult =
- Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ JupiterResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
AppendResult(RefResult, Result);
ElapsedSeconds = RefResult.ElapsedSeconds;
@@ -337,7 +327,7 @@ namespace detail {
{
Record = LoadCompactBinaryObject(RefResult.Response);
Record.IterateAttachments([&](CbFieldView AttachmentHash) {
- CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
+ JupiterResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash());
AppendResult(BlobResult, Result);
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
@@ -375,9 +365,9 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ const JupiterResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -408,7 +398,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheChunks");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
@@ -422,8 +412,8 @@ namespace detail {
bool IsCompressed = false;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- const CloudCacheResult BlobResult =
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ const JupiterResult BlobResult =
Request.ChunkId == IoHash::Zero
? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId)
: Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId);
@@ -463,7 +453,7 @@ namespace detail {
{
ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheValues");
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
GetUpstreamCacheResult Result;
for (CacheValueRequest* RequestPtr : CacheValueRequests)
@@ -477,9 +467,9 @@ namespace detail {
bool IsCompressed = false;
if (!Result.Error)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- IoHash PayloadHash;
- const CloudCacheResult BlobResult =
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
+ IoHash PayloadHash;
+ const JupiterResult BlobResult =
Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash);
ElapsedSeconds = BlobResult.ElapsedSeconds;
Payload = BlobResult.Response;
@@ -543,14 +533,14 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
+ JupiterSession Session(m_Client->Logger(), m_Client->Client());
if (CacheRecord.Type == ZenContentType::kBinary)
{
- CloudCacheResult Result;
+ JupiterResult Result;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(CacheRecord.Namespace);
Result = Session.PutRef(BlobStoreNamespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
@@ -632,7 +622,7 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
- static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ static void AppendResult(const JupiterResult& Result, GetUpstreamCacheResult& Out)
{
Out.Success &= Result.Success;
Out.Bytes += gsl::narrow<int64_t>(Result.ReceivedBytes);
@@ -645,7 +635,7 @@ namespace detail {
};
PutUpstreamCacheResult PerformStructuredPut(
- CloudCacheSession& Session,
+ JupiterSession& Session,
std::string_view Namespace,
const CacheKey& Key,
IoBuffer ObjectBuffer,
@@ -655,7 +645,7 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace);
const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
for (const IoHash& ValueContentId : ValueContentIds)
{
@@ -665,7 +655,7 @@ namespace detail {
return false;
}
- CloudCacheResult BlobResult;
+ JupiterResult BlobResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer);
@@ -760,12 +750,12 @@ namespace detail {
LoggerRef Log() { return m_Log; }
- AuthMgr& m_AuthMgr;
- LoggerRef m_Log;
- UpstreamEndpointInfo m_Info;
- UpstreamStatus m_Status;
- UpstreamEndpointStats m_Stats;
- RefPtr<CloudCacheClient> m_Client;
+ AuthMgr& m_AuthMgr;
+ LoggerRef m_Log;
+ UpstreamEndpointInfo m_Info;
+ UpstreamStatus m_Status;
+ UpstreamEndpointStats m_Stats;
+ RefPtr<JupiterClient> m_Client;
};
class ZenUpstreamEndpoint final : public UpstreamEndpoint
@@ -2129,7 +2119,7 @@ UpstreamEndpoint::CreateZenEndpoint(const ZenStructuredCacheClientOptions& Optio
}
std::unique_ptr<UpstreamEndpoint>
-UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
+UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr)
{
return std::make_unique<detail::JupiterUpstreamEndpoint>(Options, AuthConfig, Mgr);
}
diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h
index bb0193e4e..26e5decac 100644
--- a/src/zenserver/upstream/upstreamcache.h
+++ b/src/zenserver/upstream/upstreamcache.h
@@ -26,8 +26,8 @@ class CbPackage;
class CbObjectWriter;
class CidStore;
class ZenCacheStore;
-struct CloudCacheClientOptions;
-class CloudCacheTokenProvider;
+struct JupiterClientOptions;
+class JupiterAccessTokenProvider;
struct ZenStructuredCacheClientOptions;
struct UpstreamEndpointStats
@@ -128,9 +128,9 @@ public:
static std::unique_ptr<UpstreamEndpoint> CreateZenEndpoint(const ZenStructuredCacheClientOptions& Options);
- static std::unique_ptr<UpstreamEndpoint> CreateJupiterEndpoint(const CloudCacheClientOptions& Options,
- const UpstreamAuthConfig& AuthConfig,
- AuthMgr& Mgr);
+ static std::unique_ptr<UpstreamEndpoint> CreateJupiterEndpoint(const JupiterClientOptions& Options,
+ const UpstreamAuthConfig& AuthConfig,
+ AuthMgr& Mgr);
};
/**
diff --git a/src/zenserver/upstream/upstreamservice.cpp b/src/zenserver/upstream/upstreamservice.cpp
index 3d4a0f823..1dcbdb604 100644
--- a/src/zenserver/upstream/upstreamservice.cpp
+++ b/src/zenserver/upstream/upstreamservice.cpp
@@ -2,7 +2,6 @@
#include <upstream/upstreamservice.h>
#include <upstream/upstreamcache.h>
-#include <zenhttp/auth/authmgr.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/string.h>
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp
index c031a4086..7494ae379 100644
--- a/src/zenserver/upstream/zen.cpp
+++ b/src/zenserver/upstream/zen.cpp
@@ -10,7 +10,7 @@
#include <zencore/stream.h>
#include <zenhttp/formatters.h>
#include <zenhttp/httpcommon.h>
-#include <zenutil/packageformat.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/cache/structuredcachestore.h>
#include "diag/logging.h"
diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp
index 9444f7644..2d59c9357 100644
--- a/src/zenserver/workspaces/httpworkspaces.cpp
+++ b/src/zenserver/workspaces/httpworkspaces.cpp
@@ -2,12 +2,12 @@
#include <workspaces/httpworkspaces.h>
+#include <zencore/basicfile.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/trace.h>
#include <zenstore/workspaces.h>
-#include <zenutil/basicfile.h>
#include <zenutil/chunkrequests.h>
#include <zenutil/workerpools.h>
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 66b6cb858..f84bc0b00 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -5,6 +5,7 @@
#include "sentryintegration.h"
#include <zenbase/refcount.h>
+#include <zencore/basicfile.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/config.h>
@@ -25,7 +26,7 @@
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <zenstore/workspaces.h>
-#include <zenutil/basicfile.h>
+#include <zenutil/jupiter/jupiterclient.h>
#include <zenutil/workerpools.h>
#include <zenutil/zenserverprocess.h>
@@ -598,12 +599,12 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
{
std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name;
- auto Options = CloudCacheClientOptions{.Name = EndpointName,
- .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
- .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace,
- .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace,
- .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
- .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+ auto Options = JupiterClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
+ .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace,
+ .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
auto AuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl,
.OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId,