aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-12-12 08:27:54 +0100
committerGitHub Enterprise <[email protected]>2024-12-12 08:27:54 +0100
commit9bb2bf10a76127fea1db01fab42c795bdc07c936 (patch)
tree4bdb9d40ee265798afe4ec439dea45b7a7c5ed3c /src
parentMemory tracking improvements (#262) (diff)
downloadzen-9bb2bf10a76127fea1db01fab42c795bdc07c936.tar.xz
zen-9bb2bf10a76127fea1db01fab42c795bdc07c936.zip
Builds API remote project store (#258)
Feature: zen command oplog-export and oplog-import now supports --builds remote target using the Jupiter builds API
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp339
-rw-r--r--src/zen/cmds/projectstore_cmd.h43
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.cpp561
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.h29
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp52
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp90
-rw-r--r--src/zenserver/projectstore/projectstore.cpp74
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp323
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h41
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp20
-rw-r--r--src/zenserver/upstream/jupiter.cpp151
-rw-r--r--src/zenserver/upstream/jupiter.h48
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp4
13 files changed, 1465 insertions, 310 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 1e4f2675a..5e18a3624 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -29,7 +29,7 @@ namespace {
using namespace std::literals;
- const std::string DefaultCloudAccessTokenEnvVariableName(
+ const std::string DefaultJupiterAccessTokenEnvVariableName(
#if ZEN_PLATFORM_WINDOWS
"UE-CloudDataCacheAccessToken"sv
#endif
@@ -39,7 +39,7 @@ namespace {
);
- std::string ReadCloudAccessTokenFromFile(const std::filesystem::path& Path)
+ std::string ReadJupiterAccessTokenFromFile(const std::filesystem::path& Path)
{
if (!std::filesystem::is_regular_file(Path))
{
@@ -839,43 +839,66 @@ ExportOplogCommand::ExportOplogCommand()
"<disable>");
m_Options.add_option("", "a", "async", "Trigger export but don't wait for completion", cxxopts::value(m_Async), "<async>");
- m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
- m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
- m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>");
- m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>");
- m_Options.add_option("cloud",
+ m_Options.add_option("", "", "namespace", "Cloud/Builds Storage namespace", cxxopts::value(m_JupiterNamespace), "<namespace>");
+ m_Options.add_option("", "", "bucket", "Cloud/Builds Storage bucket", cxxopts::value(m_JupiterBucket), "<bucket>");
+ m_Options.add_option("",
"",
- "basekey",
- "Optional Base Cloud Storage key for incremental export",
- cxxopts::value(m_BaseCloudKey),
- "<key>");
+ "openid-provider",
+ "Cloud/Builds Storage openid provider",
+ cxxopts::value(m_JupiterOpenIdProvider),
+ "<provider>");
m_Options
- .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>");
- m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>");
- m_Options.add_option("cloud",
+ .add_option("", "", "access-token", "Cloud/Builds Storage access token", cxxopts::value(m_JupiterAccessToken), "<accesstoken>");
+ m_Options.add_option("",
"",
"access-token-env",
- "Name of environment variable that holds the cloud Storage access token",
- cxxopts::value(m_CloudAccessTokenEnv)->default_value(DefaultCloudAccessTokenEnvVariableName),
+ "Name of environment variable that holds the cloud/builds Storage access token",
+ cxxopts::value(m_JupiterAccessTokenEnv)->default_value(DefaultJupiterAccessTokenEnvVariableName),
"<envvariable>");
- m_Options.add_option("cloud",
+ m_Options.add_option("",
"",
"access-token-path",
- "Path to json file that holds the cloud Storage access token",
- cxxopts::value(m_CloudAccessTokenPath),
+ "Path to json file that holds the cloud/builds Storage access token",
+ cxxopts::value(m_JupiterAccessTokenPath),
"<filepath>");
- m_Options.add_option("cloud",
+ m_Options.add_option("",
"",
"assume-http2",
- "Assume that the cloud endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake",
- cxxopts::value(m_CloudAssumeHttp2),
+ "Assume that the cloud/builds endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake",
+ cxxopts::value(m_JupiterAssumeHttp2),
"<assumehttp2>");
+ m_Options.add_option(
+ "",
+ "",
+ "disabletempblocks",
+ "Disable temp block creation and upload blocks without waiting for oplog container to be uploaded for cloud/builds storage",
+ cxxopts::value(m_JupiterDisableTempBlocks),
+ "<disable>");
+
+ m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
+ m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>");
m_Options.add_option("cloud",
"",
- "disabletempblocks",
- "Disable temp block creation and upload blocks without waiting for oplog container to be uploaded",
- cxxopts::value(m_CloudDisableTempBlocks),
- "<disable>");
+ "basekey",
+ "Optional Base Cloud Storage key for incremental export",
+ cxxopts::value(m_BaseCloudKey),
+ "<key>");
+
+ m_Options.add_option("", "", "builds", "Builds Storage API URL", cxxopts::value(m_BuildsUrl), "<url>");
+ m_Options.add_option("builds", "", "builds-id", "Builds Id", cxxopts::value(m_BuildsId), "<id>");
+
+ m_Options.add_option("builds",
+ "",
+ "builds-metadata-path",
+ "Path to json file that holds the metadata for the build",
+ cxxopts::value(m_BuildsMetadataPath),
+ "<metadata-path>");
+ m_Options.add_option("builds",
+ "",
+ "builds-metadata",
+ "Key-value pairs separated by ';' with build meta data. (key1=value1;key2=value2)",
+ cxxopts::value(m_BuildsMetadata),
+ "<metadata>");
m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>");
m_Options.add_option("zen", "", "target-project", "Zen target project name", cxxopts::value(m_ZenProjectName), "<targetprojectid>");
@@ -945,6 +968,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
size_t TargetCount = 0;
TargetCount += m_CloudUrl.empty() ? 0 : 1;
+ TargetCount += m_BuildsUrl.empty() ? 0 : 1;
TargetCount += m_ZenUrl.empty() ? 0 : 1;
TargetCount += m_FileDirectoryPath.empty() ? 0 : 1;
if (TargetCount != 1)
@@ -961,7 +985,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (!m_CloudUrl.empty())
{
- if (m_CloudNamespace.empty() || m_CloudBucket.empty())
+ if (m_JupiterNamespace.empty() || m_JupiterBucket.empty())
{
ZEN_ERROR("Options for cloud target are missing");
ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str());
@@ -969,13 +993,40 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
}
if (m_CloudKey.empty())
{
- std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket);
+ std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_JupiterNamespace, m_JupiterBucket);
IoHash Key = IoHash::HashBuffer(KeyString.data(), KeyString.size());
m_CloudKey = Key.ToHexString();
ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey);
}
}
+ if (!m_BuildsUrl.empty())
+ {
+ if (m_JupiterNamespace.empty() || m_JupiterBucket.empty())
+ {
+ ZEN_ERROR("Options for builds target are missing");
+ ZEN_CONSOLE("{}", m_Options.help({"builds"}).c_str());
+ return 1;
+ }
+ if (m_BuildsMetadataPath.empty() && m_BuildsMetadata.empty())
+ {
+ ZEN_ERROR("Options for builds target are missing");
+ ZEN_CONSOLE("{}", m_Options.help({"builds"}).c_str());
+ return 1;
+ }
+ if (!m_BuildsMetadataPath.empty() && !m_BuildsMetadata.empty())
+ {
+ ZEN_ERROR("Conflicting options for builds target");
+ ZEN_CONSOLE("{}", m_Options.help({"builds"}).c_str());
+ return 1;
+ }
+ if (m_BuildsId.empty())
+ {
+ m_BuildsId = Oid::NewOid().ToString();
+ ZEN_CONSOLE("Using generated builds id: {}", m_BuildsId);
+ }
+ }
+
if (!m_ZenUrl.empty())
{
if (m_ZenProjectName.empty())
@@ -1101,32 +1152,32 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
Writer.BeginObject("cloud"sv);
{
Writer.AddString("url"sv, m_CloudUrl);
- Writer.AddString("namespace"sv, m_CloudNamespace);
- Writer.AddString("bucket"sv, m_CloudBucket);
+ Writer.AddString("namespace"sv, m_JupiterNamespace);
+ Writer.AddString("bucket"sv, m_JupiterBucket);
Writer.AddString("key"sv, m_CloudKey);
if (!m_BaseCloudKey.empty())
{
Writer.AddString("basekey"sv, m_BaseCloudKey);
}
- if (!m_CloudOpenIdProvider.empty())
+ if (!m_JupiterOpenIdProvider.empty())
{
- Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider);
+ Writer.AddString("openid-provider"sv, m_JupiterOpenIdProvider);
}
- if (!m_CloudAccessToken.empty())
+ if (!m_JupiterAccessToken.empty())
{
- Writer.AddString("access-token"sv, m_CloudAccessToken);
+ Writer.AddString("access-token"sv, m_JupiterAccessToken);
}
- if (!m_CloudAccessTokenPath.empty())
+ if (!m_JupiterAccessTokenPath.empty())
{
- std::string ResolvedCloudAccessToken = ReadCloudAccessTokenFromFile(m_CloudAccessTokenPath);
+ std::string ResolvedCloudAccessToken = ReadJupiterAccessTokenFromFile(m_JupiterAccessTokenPath);
if (!ResolvedCloudAccessToken.empty())
{
Writer.AddString("access-token"sv, ResolvedCloudAccessToken);
}
}
- if (!m_CloudAccessTokenEnv.empty())
+ if (!m_JupiterAccessTokenEnv.empty())
{
- std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_CloudAccessTokenEnv);
+ std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_JupiterAccessTokenEnv);
if (!ResolvedCloudAccessTokenEnv.empty())
{
@@ -1134,10 +1185,10 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
}
else
{
- Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv);
+ Writer.AddString("access-token-env"sv, m_JupiterAccessTokenEnv);
}
}
- if (m_CloudAssumeHttp2)
+ if (m_JupiterAssumeHttp2)
{
Writer.AddBool("assumehttp2"sv, true);
}
@@ -1145,7 +1196,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
Writer.AddBool("disableblocks"sv, true);
}
- if (m_CloudDisableTempBlocks)
+ if (m_JupiterDisableTempBlocks)
{
Writer.AddBool("disabletempblocks"sv, true);
}
@@ -1153,12 +1204,89 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
Writer.EndObject(); // "cloud"
TargetDescription = fmt::format("[cloud] {}/{}/{}/{}{}{}",
m_CloudUrl,
- m_CloudNamespace,
- m_CloudBucket,
+ m_JupiterNamespace,
+ m_JupiterBucket,
m_CloudKey,
m_BaseCloudKey.empty() ? "" : " Base: ",
m_BaseCloudKey);
}
+ if (!m_BuildsUrl.empty())
+ {
+ Writer.BeginObject("builds"sv);
+ {
+ Writer.AddString("url"sv, m_BuildsUrl);
+ Writer.AddString("namespace"sv, m_JupiterNamespace);
+ Writer.AddString("bucket"sv, m_JupiterBucket);
+ Writer.AddString("buildsid"sv, m_BuildsId);
+ if (!m_JupiterOpenIdProvider.empty())
+ {
+ Writer.AddString("openid-provider"sv, m_JupiterOpenIdProvider);
+ }
+ if (!m_JupiterAccessToken.empty())
+ {
+ Writer.AddString("access-token"sv, m_JupiterAccessToken);
+ }
+ if (!m_JupiterAccessTokenPath.empty())
+ {
+ std::string ResolvedCloudAccessToken = ReadJupiterAccessTokenFromFile(m_JupiterAccessTokenPath);
+ if (!ResolvedCloudAccessToken.empty())
+ {
+ Writer.AddString("access-token"sv, ResolvedCloudAccessToken);
+ }
+ }
+ if (!m_JupiterAccessTokenEnv.empty())
+ {
+ std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_JupiterAccessTokenEnv);
+
+ if (!ResolvedCloudAccessTokenEnv.empty())
+ {
+ Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv);
+ }
+ else
+ {
+ Writer.AddString("access-token-env"sv, m_JupiterAccessTokenEnv);
+ }
+ }
+ if (m_JupiterAssumeHttp2)
+ {
+ Writer.AddBool("assumehttp2"sv, true);
+ }
+ if (m_DisableBlocks)
+ {
+ Writer.AddBool("disableblocks"sv, true);
+ }
+ if (m_JupiterDisableTempBlocks)
+ {
+ Writer.AddBool("disabletempblocks"sv, true);
+ }
+
+ if (!m_BuildsMetadataPath.empty())
+ {
+ std::filesystem::path MetadataPath(m_BuildsMetadataPath);
+ IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten();
+ std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize());
+ CbFieldIterator MetaData = LoadCompactBinaryFromJson(Json);
+ Writer.AddBinary("metadata"sv, MetaData.GetBuffer());
+ }
+ if (!m_BuildsMetadata.empty())
+ {
+ CbObjectWriter MetaDataWriter(m_BuildsMetadata.length());
+ ForEachStrTok(m_BuildsMetadata, ';', [&](std::string_view Pair) {
+ size_t SplitPos = Pair.find('=');
+ if (SplitPos == std::string::npos || SplitPos == 0)
+ {
+ throw std::runtime_error(fmt::format("builds metadata key-value pair '{}' is malformed", Pair));
+ }
+ MetaDataWriter.AddString(Pair.substr(0, SplitPos), Pair.substr(SplitPos + 1));
+ return true;
+ });
+ CbObject MetaData = MetaDataWriter.Save();
+ Writer.AddBinary("metadata"sv, MetaData.GetBuffer());
+ }
+ }
+ Writer.EndObject(); // "builds"
+ TargetDescription = fmt::format("[builds] {}/{}/{}/{}", m_CloudUrl, m_JupiterNamespace, m_JupiterBucket, m_BuildsId);
+ }
if (!m_ZenUrl.empty())
{
Writer.BeginObject("zen"sv);
@@ -1230,32 +1358,41 @@ ImportOplogCommand::ImportOplogCommand()
cxxopts::value(m_IgnoreMissingAttachments),
"<ignore>");
- m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
- m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
- m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>");
- m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>");
+ m_Options.add_option("", "", "namespace", "Cloud/Builds Storage namespace", cxxopts::value(m_JupiterNamespace), "<namespace>");
+ m_Options.add_option("", "", "bucket", "Cloud/Builds Storage bucket", cxxopts::value(m_JupiterBucket), "<bucket>");
+ m_Options.add_option("",
+ "",
+ "openid-provider",
+ "Cloud/Builds Storage openid provider",
+ cxxopts::value(m_JupiterOpenIdProvider),
+ "<provider>");
m_Options
- .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>");
- m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>");
- m_Options.add_option("cloud",
+ .add_option("", "", "access-token", "Cloud/Builds Storage access token", cxxopts::value(m_JupiterAccessToken), "<accesstoken>");
+ m_Options.add_option("",
"",
"access-token-env",
- "Name of environment variable that holds the cloud Storage access token",
- cxxopts::value(m_CloudAccessTokenEnv)->default_value(DefaultCloudAccessTokenEnvVariableName),
+ "Name of environment variable that holds the cloud/builds Storage access token",
+ cxxopts::value(m_JupiterAccessTokenEnv)->default_value(DefaultJupiterAccessTokenEnvVariableName),
"<envvariable>");
- m_Options.add_option("cloud",
+ m_Options.add_option("",
"",
"access-token-path",
- "Path to json file that holds the cloud Storage access token",
- cxxopts::value(m_CloudAccessTokenPath),
+ "Path to json file that holds the cloud/builds Storage access token",
+ cxxopts::value(m_JupiterAccessTokenPath),
"<filepath>");
- m_Options.add_option("cloud",
+ m_Options.add_option("",
"",
"assume-http2",
- "Assume that the cloud endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake",
- cxxopts::value(m_CloudAssumeHttp2),
+ "Assume that the cloud/builds endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake",
+ cxxopts::value(m_JupiterAssumeHttp2),
"<assumehttp2>");
+ m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
+ m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>");
+
+ m_Options.add_option("", "", "builds", "Builds Storage URL", cxxopts::value(m_BuildsUrl), "<url>");
+ m_Options.add_option("builds", "", "builds-id", "Builds Id", cxxopts::value(m_BuildsId), "<id>");
+
m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>");
m_Options.add_option("zen", "", "source-project", "Zen source project name", cxxopts::value(m_ZenProjectName), "<sourceprojectid>");
m_Options.add_option("zen", "", "source-oplog", "Zen source oplog name", cxxopts::value(m_ZenOplogName), "<sourceoplogid>");
@@ -1319,6 +1456,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
size_t TargetCount = 0;
TargetCount += m_CloudUrl.empty() ? 0 : 1;
+ TargetCount += m_BuildsUrl.empty() ? 0 : 1;
TargetCount += m_ZenUrl.empty() ? 0 : 1;
TargetCount += m_FileDirectoryPath.empty() ? 0 : 1;
if (TargetCount != 1)
@@ -1330,7 +1468,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (!m_CloudUrl.empty())
{
- if (m_CloudNamespace.empty() || m_CloudBucket.empty())
+ if (m_JupiterNamespace.empty() || m_JupiterBucket.empty())
{
ZEN_ERROR("Options for cloud source are missing");
ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str());
@@ -1338,13 +1476,23 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
}
if (m_CloudKey.empty())
{
- std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket);
+ std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_JupiterNamespace, m_JupiterBucket);
IoHash Key = IoHash::HashBuffer(KeyString.data(), KeyString.size());
m_CloudKey = Key.ToHexString();
ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey);
}
}
+ if (!m_BuildsUrl.empty())
+ {
+ if (m_JupiterNamespace.empty() || m_JupiterBucket.empty() || m_BuildsId.empty())
+ {
+ ZEN_ERROR("Options for builds source are missing");
+ ZEN_CONSOLE("{}", m_Options.help({"builds"}).c_str());
+ return 1;
+ }
+ }
+
if (!m_ZenUrl.empty())
{
if (m_ZenProjectName.empty())
@@ -1434,28 +1582,28 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
Writer.BeginObject("cloud"sv);
{
Writer.AddString("url"sv, m_CloudUrl);
- Writer.AddString("namespace"sv, m_CloudNamespace);
- Writer.AddString("bucket"sv, m_CloudBucket);
+ Writer.AddString("namespace"sv, m_JupiterNamespace);
+ Writer.AddString("bucket"sv, m_JupiterBucket);
Writer.AddString("key"sv, m_CloudKey);
- if (!m_CloudOpenIdProvider.empty())
+ if (!m_JupiterOpenIdProvider.empty())
{
- Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider);
+ Writer.AddString("openid-provider"sv, m_JupiterOpenIdProvider);
}
- if (!m_CloudAccessToken.empty())
+ if (!m_JupiterAccessToken.empty())
{
- Writer.AddString("access-token"sv, m_CloudAccessToken);
+ Writer.AddString("access-token"sv, m_JupiterAccessToken);
}
- if (!m_CloudAccessTokenPath.empty())
+ if (!m_JupiterAccessTokenPath.empty())
{
- std::string ResolvedCloudAccessToken = ReadCloudAccessTokenFromFile(m_CloudAccessTokenPath);
+ std::string ResolvedCloudAccessToken = ReadJupiterAccessTokenFromFile(m_JupiterAccessTokenPath);
if (!ResolvedCloudAccessToken.empty())
{
Writer.AddString("access-token"sv, ResolvedCloudAccessToken);
}
}
- if (!m_CloudAccessTokenEnv.empty())
+ if (!m_JupiterAccessTokenEnv.empty())
{
- std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_CloudAccessTokenEnv);
+ std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_JupiterAccessTokenEnv);
if (!ResolvedCloudAccessTokenEnv.empty())
{
@@ -1463,16 +1611,61 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
}
else
{
- Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv);
+ Writer.AddString("access-token-env"sv, m_JupiterAccessTokenEnv);
}
}
- if (m_CloudAssumeHttp2)
+ if (m_JupiterAssumeHttp2)
{
Writer.AddBool("assumehttp2"sv, true);
}
}
Writer.EndObject(); // "cloud"
- SourceDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey);
+ SourceDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_JupiterNamespace, m_JupiterBucket, m_CloudKey);
+ }
+ if (!m_BuildsUrl.empty())
+ {
+ Writer.BeginObject("builds"sv);
+ {
+ Writer.AddString("url"sv, m_BuildsUrl);
+ Writer.AddString("namespace"sv, m_JupiterNamespace);
+ Writer.AddString("bucket"sv, m_JupiterBucket);
+ Writer.AddString("buildsid"sv, m_BuildsId);
+ if (!m_JupiterOpenIdProvider.empty())
+ {
+ Writer.AddString("openid-provider"sv, m_JupiterOpenIdProvider);
+ }
+ if (!m_JupiterAccessToken.empty())
+ {
+ Writer.AddString("access-token"sv, m_JupiterAccessToken);
+ }
+ if (!m_JupiterAccessTokenPath.empty())
+ {
+ std::string ResolvedCloudAccessToken = ReadJupiterAccessTokenFromFile(m_JupiterAccessTokenPath);
+ if (!ResolvedCloudAccessToken.empty())
+ {
+ Writer.AddString("access-token"sv, ResolvedCloudAccessToken);
+ }
+ }
+ if (!m_JupiterAccessTokenEnv.empty())
+ {
+ std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_JupiterAccessTokenEnv);
+
+ if (!ResolvedCloudAccessTokenEnv.empty())
+ {
+ Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv);
+ }
+ else
+ {
+ Writer.AddString("access-token-env"sv, m_JupiterAccessTokenEnv);
+ }
+ }
+ if (m_JupiterAssumeHttp2)
+ {
+ Writer.AddBool("assumehttp2"sv, true);
+ }
+ }
+ Writer.EndObject(); // "builds"
+ SourceDescription = fmt::format("[builds] {}/{}/{}/{}", m_CloudUrl, m_JupiterNamespace, m_JupiterBucket, m_BuildsId);
}
if (!m_ZenUrl.empty())
{
diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h
index 3a9d5dcb8..e66e98414 100644
--- a/src/zen/cmds/projectstore_cmd.h
+++ b/src/zen/cmds/projectstore_cmd.h
@@ -103,18 +103,25 @@ private:
bool m_DisableBlocks = false;
bool m_Async = false;
+ std::string m_JupiterNamespace;
+ std::string m_JupiterBucket;
+ std::string m_JupiterOpenIdProvider;
+ std::string m_JupiterAccessToken;
+ std::string m_JupiterAccessTokenEnv;
+ std::string m_JupiterAccessTokenPath;
+ bool m_JupiterAssumeHttp2 = false;
+ bool m_JupiterDisableTempBlocks = false;
+
std::string m_CloudUrl;
- std::string m_CloudNamespace;
- std::string m_CloudBucket;
std::string m_CloudKey;
std::string m_BaseCloudKey;
- std::string m_CloudOpenIdProvider;
- std::string m_CloudAccessToken;
- std::string m_CloudAccessTokenEnv;
- std::string m_CloudAccessTokenPath;
- bool m_CloudAssumeHttp2 = false;
- bool m_CloudDisableTempBlocks = false;
- bool m_IgnoreMissingAttachments = false;
+
+ std::string m_BuildsUrl;
+ std::string m_BuildsId;
+ std::string m_BuildsMetadataPath;
+ std::string m_BuildsMetadata;
+
+ bool m_IgnoreMissingAttachments = false;
std::string m_ZenUrl;
std::string m_ZenProjectName;
@@ -152,15 +159,19 @@ private:
bool m_Clean = false;
bool m_PlainProgress = false;
+ std::string m_JupiterNamespace;
+ std::string m_JupiterBucket;
+ std::string m_JupiterOpenIdProvider;
+ std::string m_JupiterAccessToken;
+ std::string m_JupiterAccessTokenEnv;
+ std::string m_JupiterAccessTokenPath;
+ bool m_JupiterAssumeHttp2 = false;
+
std::string m_CloudUrl;
- std::string m_CloudNamespace;
- std::string m_CloudBucket;
std::string m_CloudKey;
- std::string m_CloudOpenIdProvider;
- std::string m_CloudAccessToken;
- std::string m_CloudAccessTokenEnv;
- std::string m_CloudAccessTokenPath;
- bool m_CloudAssumeHttp2 = false;
+
+ std::string m_BuildsUrl;
+ std::string m_BuildsId;
std::string m_ZenUrl;
std::string m_ZenProjectName;
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
new file mode 100644
index 000000000..6d0d51a60
--- /dev/null
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
@@ -0,0 +1,561 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "buildsremoteprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+
+#include <upstream/jupiter.h>
+#include <zenhttp/auth/authmgr.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
+
+class BuildsRemoteStore : public RemoteProjectStore
+{
+public:
+ BuildsRemoteStore(Ref<CloudCacheClient>&& CloudClient,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const Oid& BuildId,
+ const IoBuffer& MetaData,
+ bool ForceDisableBlocks,
+ bool ForceDisableTempBlocks,
+ const std::filesystem::path& TempFilePath)
+ : m_CloudClient(std::move(CloudClient))
+ , m_Namespace(Namespace)
+ , m_Bucket(Bucket)
+ , m_BuildId(BuildId)
+ , m_MetaData(MetaData)
+ , m_TempFilePath(TempFilePath)
+ {
+ m_MetaData.MakeOwned();
+ if (ForceDisableBlocks)
+ {
+ m_EnableBlocks = false;
+ }
+ if (ForceDisableTempBlocks)
+ {
+ m_UseTempBlocks = false;
+ }
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = m_EnableBlocks,
+ .UseTempBlockFiles = m_UseTempBlocks,
+ .AllowChunking = true,
+ .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId),
+ .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)};
+ }
+
+ virtual Stats GetStats() const override
+ {
+ return {.m_SentBytes = m_SentBytes.load(),
+ .m_ReceivedBytes = m_ReceivedBytes.load(),
+ .m_RequestTimeNS = m_RequestTimeNS.load(),
+ .m_RequestCount = m_RequestCount.load(),
+ .m_PeakSentBytes = m_PeakSentBytes.load(),
+ .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
+ .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
+ }
+
+ virtual CreateContainerResult CreateContainer() override
+ {
+ ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
+
+ CloudCacheSession Session(m_CloudClient.Get());
+
+ IoBuffer Payload = m_MetaData;
+ Payload.SetContentType(ZenContentType::kCbObject);
+ CloudCacheResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload);
+ AddStats(PutResult);
+
+ CreateContainerResult Result{ConvertResult(PutResult)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Result.Reason);
+ }
+ m_OplogBuildPartId = Oid::NewOid();
+ return Result;
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
+ CloudCacheSession Session(m_CloudClient.Get());
+ PutBuildPartResult PutResult =
+ Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload);
+ AddStats(PutResult);
+
+ SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Result.Reason);
+ }
+
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override
+ {
+ ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
+ CloudCacheSession Session(m_CloudClient.Get());
+
+ CloudCacheResult PutResult =
+ Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ AddStats(PutResult);
+
+ SaveAttachmentResult Result{ConvertResult(PutResult)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ RawHash,
+ Result.Reason);
+ return Result;
+ }
+
+ if (Block.BlockHash == RawHash)
+ {
+ ZEN_ASSERT(Block.ChunkLengths.size() == Block.ChunkHashes.size());
+ CbObjectWriter Writer;
+ Writer.AddHash("rawHash"sv, RawHash);
+ Writer.BeginArray("rawHashes"sv);
+ {
+ for (const IoHash& ChunkHash : Block.ChunkHashes)
+ {
+ Writer.AddHash(ChunkHash);
+ }
+ }
+ Writer.EndArray();
+ Writer.BeginArray("chunkLengths");
+ {
+ for (uint32_t ChunkSize : Block.ChunkLengths)
+ {
+ Writer.AddInteger(ChunkSize);
+ }
+ }
+ Writer.EndArray();
+ Writer.BeginArray("chunkOffsets");
+ {
+ ZEN_ASSERT(Block.FirstChunkOffset != (uint32_t)-1);
+ uint32_t Offset = Block.FirstChunkOffset;
+ for (uint32_t ChunkSize : Block.ChunkLengths)
+ {
+ Writer.AddInteger(Offset);
+ Offset += ChunkSize;
+ }
+ }
+ Writer.EndArray();
+
+ Writer.BeginObject("metadata"sv);
+ {
+ Writer.AddString("createdBy", "zenserver");
+ }
+ Writer.EndObject();
+
+ IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer();
+ MetaPayload.SetContentType(ZenContentType::kCbObject);
+ CloudCacheResult PutMetaResult =
+ Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload);
+ AddStats(PutMetaResult);
+ RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult);
+ if (MetaDataResult.ErrorCode)
+ {
+ ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ RawHash,
+ MetaDataResult.Reason);
+ }
+ }
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ SaveAttachmentsResult Result;
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
+ SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {});
+ if (ChunkResult.ErrorCode)
+ {
+ return SaveAttachmentsResult{ChunkResult};
+ }
+ }
+ return Result;
+ }
+
+ virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
+ {
+ ZEN_UNUSED(RawHash);
+ ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
+
+ CloudCacheSession Session(m_CloudClient.Get());
+ FinalizeBuildPartResult FinalizeRefResult =
+ Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash);
+ AddStats(FinalizeRefResult);
+
+ FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Result.Reason);
+ }
+ else if (Result.Needs.empty())
+ {
+ CloudCacheResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId);
+ AddStats(FinalizeBuildResult);
+ FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds;
+ Result = {ConvertResult(FinalizeBuildResult)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ FinalizeBuildResult.Reason);
+ }
+ }
+ return Result;
+ }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
+
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId);
+ AddStats(GetBuildResult);
+ LoadContainerResult Result{ConvertResult(GetBuildResult)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Result.Reason);
+ return Result;
+ }
+ CbObject BuildObject = LoadCompactBinaryObject(GetBuildResult.Response);
+ if (!BuildObject)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv,
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId);
+ return Result;
+ }
+ CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
+ if (!PartsObject)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv,
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId);
+ return Result;
+ }
+ m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
+ if (m_OplogBuildPartId == Oid::Zero)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ OplogContainerPartName);
+ return Result;
+ }
+
+ CloudCacheResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
+ AddStats(GetBuildPartResult);
+ Result = {ConvertResult(GetBuildResult)};
+ Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds;
+ if (Result.ErrorCode)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Result.Reason);
+ return Result;
+ }
+
+ CbObject ContainerObject = LoadCompactBinaryObject(GetBuildPartResult.Response);
+ if (!ContainerObject)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv,
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId);
+ return Result;
+ }
+ Result.ContainerObject = std::move(ContainerObject);
+ return Result;
+ }
+
+ virtual GetKnownBlocksResult GetKnownBlocks() override
+ {
+ ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
+ AddStats(FindResult);
+ GetKnownBlocksResult Result{ConvertResult(FindResult)};
+ if (Result.ErrorCode)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Result.Reason);
+ return Result;
+ }
+ CbObject BlocksObject = LoadCompactBinaryObject(FindResult.Response);
+ if (!BlocksObject)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv,
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId);
+ return Result;
+ }
+
+ CbArrayView Blocks = BlocksObject["blocks"].AsArrayView();
+ Result.Blocks.reserve(Blocks.Num());
+ for (CbFieldView BlockView : Blocks)
+ {
+ CbObjectView BlockObject = BlockView.AsObjectView();
+ IoHash BlockHash = BlockObject["rawHash"sv].AsHash();
+ if (BlockHash != IoHash::Zero)
+ {
+ CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView();
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkView : ChunksArray)
+ {
+ ChunkHashes.push_back(ChunkView.AsHash());
+ }
+ Result.Blocks.emplace_back(Block{.BlockHash = BlockHash, .ChunkHashes = ChunkHashes});
+ }
+ }
+ return Result;
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath);
+ AddStats(GetResult);
+
+ LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
+ if (GetResult.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ RawHash,
+ Result.Reason);
+ }
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ LoadAttachmentsResult Result;
+ for (const IoHash& Hash : RawHashes)
+ {
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ if (ChunkResult.ErrorCode)
+ {
+ return LoadAttachmentsResult{ChunkResult};
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
+ }
+ return Result;
+ }
+
+private:
+ void AddStats(const CloudCacheResult& Result)
+ {
+ m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes));
+ m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes));
+ m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
+ SetAtomicMax(m_PeakSentBytes, Result.SentBytes);
+ SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes);
+ if (Result.ElapsedSeconds > 0.0)
+ {
+ uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
+ SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
+ }
+
+ m_RequestCount.fetch_add(1);
+ }
+
+ static Result ConvertResult(const CloudCacheResult& Response)
+ {
+ std::string Text;
+ int32_t ErrorCode = 0;
+ if (Response.ErrorCode != 0 || !Response.Success)
+ {
+ if (Response.Response)
+ {
+ HttpContentType ContentType = Response.Response.GetContentType();
+ if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON)
+ {
+ ExtendableStringBuilder<256> SB;
+ SB.Append("\n");
+ SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()),
+ Response.Response.GetSize()));
+ Text = SB.ToString();
+ }
+ else if (ContentType == ZenContentType::kCbObject)
+ {
+ ExtendableStringBuilder<256> SB;
+ SB.Append("\n");
+ CompactBinaryToJson(Response.Response.GetView(), SB);
+ Text = SB.ToString();
+ }
+ }
+ }
+ if (Response.ErrorCode != 0)
+ {
+ ErrorCode = Response.ErrorCode;
+ }
+ else if (!Response.Success)
+ {
+ ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ }
+ return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
+ }
+
+ Ref<CloudCacheClient> m_CloudClient;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const Oid m_BuildId;
+ IoBuffer m_MetaData;
+ Oid m_OplogBuildPartId = Oid::Zero;
+ std::filesystem::path m_TempFilePath;
+ bool m_EnableBlocks = true;
+ bool m_UseTempBlocks = true;
+
+ std::atomic_uint64_t m_SentBytes = {};
+ std::atomic_uint64_t m_ReceivedBytes = {};
+ std::atomic_uint64_t m_RequestTimeNS = {};
+ std::atomic_uint64_t m_RequestCount = {};
+ std::atomic_uint64_t m_PeakSentBytes = {};
+ std::atomic_uint64_t m_PeakReceivedBytes = {};
+ std::atomic_uint64_t m_PeakBytesPerSec = {};
+};
+
+std::shared_ptr<RemoteProjectStore>
+CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume https URL
+ Url = fmt::format("https://{}"sv, Url);
+ }
+ CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv,
+ .ServiceUrl = Url,
+ .ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(1800000),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 4};
+ // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
+ // 2) Access token as parameter in request
+ // 3) Environment variable (different win vs linux/mac)
+ // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
+
+ std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+ if (!Options.OpenIdProvider.empty())
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() {
+ AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider);
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+ else if (!Options.AccessToken.empty())
+ {
+ TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() {
+ return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()};
+ });
+ }
+ else
+ {
+ TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() {
+ AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default");
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+
+ Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(CloudClient),
+ Options.Namespace,
+ Options.Bucket,
+ Options.BuildId,
+ Options.MetaData,
+ Options.ForceDisableBlocks,
+ Options.ForceDisableTempBlocks,
+ TempFilePath);
+ return RemoteStore;
+}
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h
new file mode 100644
index 000000000..8b2c6c8c8
--- /dev/null
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.h
@@ -0,0 +1,29 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "remoteprojectstore.h"
+
+namespace zen {
+
+class AuthMgr;
+
+struct BuildsRemoteStoreOptions : RemoteStoreOptions
+{
+ std::string Url;
+ std::string Namespace;
+ std::string Bucket;
+ Oid BuildId;
+ std::string OpenIdProvider;
+ std::string AccessToken;
+ AuthMgr& AuthManager;
+ bool ForceDisableBlocks = false;
+ bool ForceDisableTempBlocks = false;
+ bool AssumeHttp2 = false;
+ IoBuffer MetaData;
+};
+
+std::shared_ptr<RemoteProjectStore> CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options,
+ const std::filesystem::path& TempFilePath);
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
index 7d6c43c1e..0fe739a12 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -41,7 +41,6 @@ public:
.UseTempBlockFiles = m_UseTempBlocks,
.AllowChunking = true,
.ContainerName = m_Name,
- .BaseContainerName = m_OptionalBaseName,
.Description =
fmt::format("[file] {}/{}{}{}"sv, m_OutputPath, m_Name, m_OptionalBaseName.empty() ? "" : " Base: ", m_OptionalBaseName)};
}
@@ -57,6 +56,12 @@ public:
.m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
}
+ virtual CreateContainerResult CreateContainer() override
+ {
+ // Nothing to do here
+ return {};
+ }
+
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
Stopwatch Timer;
@@ -101,7 +106,7 @@ public:
return Result;
}
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override
{
Stopwatch Timer;
SaveAttachmentResult Result;
@@ -139,7 +144,7 @@ public:
for (const SharedBuffer& Chunk : Chunks)
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
- SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash());
+ SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {});
if (ChunkResult.ErrorCode)
{
ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
@@ -154,29 +159,42 @@ public:
virtual FinalizeResult FinalizeContainer(const IoHash&) override { return {}; }
virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Name); }
- virtual LoadContainerResult LoadBaseContainer() override
+
+ virtual GetKnownBlocksResult GetKnownBlocks() override
{
if (m_OptionalBaseName.empty())
{
- return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
}
- return LoadContainer(m_OptionalBaseName);
- }
-
- virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override
- {
- Stopwatch Timer;
- HasAttachmentsResult Result;
- for (const IoHash& RawHash : RawHashes)
+ LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseName);
+ if (LoadResult.ErrorCode)
+ {
+ return GetKnownBlocksResult{LoadResult};
+ }
+ Stopwatch Timer;
+ std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject);
+ if (BlockHashes.empty())
+ {
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
+ .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
+ }
+ std::vector<IoHash> ExistingBlockHashes;
+ for (const IoHash& RawHash : BlockHashes)
{
std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
- if (!std::filesystem::is_regular_file(ChunkPath))
+ if (std::filesystem::is_regular_file(ChunkPath))
{
- Result.Needs.insert(RawHash);
+ ExistingBlockHashes.push_back(RawHash);
}
}
- AddStats(0, 0, Timer.GetElapsedTimeUs() * 1000);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ if (ExistingBlockHashes.empty())
+ {
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
+ .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
+ }
+ std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
+ GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
+ Result.Blocks = std::move(KnownBlocks);
return Result;
}
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index d926499c4..f4fe578ff 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -46,7 +46,6 @@ public:
.UseTempBlockFiles = m_UseTempBlocks,
.AllowChunking = true,
.ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
- .BaseContainerName = m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
.Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv,
m_CloudClient->ServiceUrl(),
m_Namespace,
@@ -66,6 +65,12 @@ public:
.m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
}
+ virtual CreateContainerResult CreateContainer() override
+ {
+ // Nothing to do here
+ return {};
+ }
+
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
CloudCacheSession Session(m_CloudClient.Get());
@@ -85,7 +90,7 @@ public:
return Result;
}
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override
{
CloudCacheSession Session(m_CloudClient.Get());
CloudCacheResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
@@ -109,7 +114,7 @@ public:
for (const SharedBuffer& Chunk : Chunks)
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
- SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash());
+ SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {});
if (ChunkResult.ErrorCode)
{
return SaveAttachmentsResult{ChunkResult};
@@ -139,31 +144,58 @@ public:
virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Key); }
- virtual LoadContainerResult LoadBaseContainer() override
+ virtual GetKnownBlocksResult GetKnownBlocks() override
{
if (m_OptionalBaseKey == IoHash::Zero)
{
- return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
+ }
+ LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseKey);
+ if (LoadResult.ErrorCode)
+ {
+ return GetKnownBlocksResult{LoadResult};
+ }
+ std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject);
+ if (BlockHashes.empty())
+ {
+ return GetKnownBlocksResult{
+ {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}};
}
- return LoadContainer(m_OptionalBaseKey);
- }
- virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override
- {
CloudCacheSession Session(m_CloudClient.Get());
CloudCacheExistsResult ExistsResult =
- Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(RawHashes.begin(), RawHashes.end()));
+ Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end()));
AddStats(ExistsResult);
- HasAttachmentsResult Result{ConvertResult(ExistsResult),
- std::unordered_set<IoHash, IoHash::Hasher>(ExistsResult.Needs.begin(), ExistsResult.Needs.end())};
if (ExistsResult.ErrorCode)
{
- Result.Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'",
- m_CloudClient->ServiceUrl(),
- m_Namespace,
- Result.Reason);
+ return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode,
+ .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds,
+ .Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'",
+ m_CloudClient->ServiceUrl(),
+ m_Namespace,
+ ExistsResult.Reason)}};
+ }
+
+ Stopwatch Timer;
+ std::vector<IoHash> ExistingBlockHashes;
+ for (const IoHash& RawHash : BlockHashes)
+ {
+ if (!ExistsResult.Needs.contains(RawHash))
+ {
+ ExistingBlockHashes.push_back(RawHash);
+ }
}
+ if (ExistingBlockHashes.empty())
+ {
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
+ .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}};
+ }
+ std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
+
+ GetKnownBlocksResult Result{
+ {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}};
+ Result.Blocks = std::move(KnownBlocks);
return Result;
}
@@ -323,12 +355,21 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
.AssumeHttp2 = Options.AssumeHttp2,
.AllowResume = true,
.RetryCount = 4};
- // 1) Access token as parameter in request
- // 2) Environment variable (different win vs linux/mac)
- // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
+ // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
+ // 2) Access token as parameter in request
+ // 3) Environment variable (different win vs linux/mac)
+ // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
- if (!Options.AccessToken.empty())
+ if (!Options.OpenIdProvider.empty())
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() {
+ AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider);
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+ else if (!Options.AccessToken.empty())
{
TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() {
return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()};
@@ -336,11 +377,10 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
}
else
{
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() {
- AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
+ TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() {
+ AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default");
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
}
Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index f25042a62..63a80fbd8 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -24,6 +24,7 @@
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
+#include "buildsremoteprojectstore.h"
#include "fileremoteprojectstore.h"
#include "jupiterremoteprojectstore.h"
#include "remoteprojectstore.h"
@@ -266,6 +267,72 @@ namespace {
RemoteStore = CreateZenRemoteStore(Options, TempFilePath);
}
+ if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds)
+ {
+ std::string_view BuildsServiceUrl = Builds["url"sv].AsString();
+ if (BuildsServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = cpr::util::urlDecode(std::string(BuildsServiceUrl));
+ std::string_view Namespace = Builds["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Builds["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Builds["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+ std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString();
+ if (!AccessTokenEnvVariable.empty())
+ {
+ AccessToken = GetEnvVariable(AccessTokenEnvVariable);
+ }
+ }
+ std::string_view BuildIdParam = Builds["buildsid"sv].AsString();
+ if (BuildIdParam.empty())
+ {
+ return {nullptr, "Missing build id"};
+ }
+ if (BuildIdParam.length() != Oid::StringLength)
+ {
+ return {nullptr, "Invalid build id"};
+ }
+ Oid BuildId = Oid::FromHexString(BuildIdParam);
+ if (BuildId == Oid::Zero)
+ {
+ return {nullptr, "Invalid build id string"};
+ }
+
+ bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false);
+ bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false);
+
+ MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView();
+ IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize());
+
+ BuildsRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ BuildId,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks,
+ AssumeHttp2,
+ MetaData};
+ RemoteStore = CreateBuildsRemoteStore(Options, TempFilePath);
+ }
+
if (!RemoteStore)
{
return {nullptr, "Unknown remote store type"};
@@ -5132,7 +5199,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
/* BuildBlocks */ false,
/* IgnoreMissingAttachments */ false,
/* AllowChunking*/ false,
- [](CompressedBuffer&&, const IoHash&) {},
+ [](CompressedBuffer&&, RemoteProjectStore::Block&&) {},
[](const IoHash&, TGetAttachmentBufferFunc&&) {},
[](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
/* EmbedLooseFiles*/ false);
@@ -8354,8 +8421,9 @@ TEST_CASE("project.store.block")
return CompositeBuffer(SharedBuffer(Buffer));
}));
}
- CompressedBuffer Block = GenerateBlock(std::move(Chunks));
- CHECK(IterateBlock(Block.Decompress(), [](CompressedBuffer&&, const IoHash&) {}));
+ RemoteProjectStore::Block Block;
+ CompressedBuffer BlockBuffer = GenerateBlock(std::move(Chunks), Block);
+ CHECK(IterateBlock(BlockBuffer.Decompress(), [](CompressedBuffer&&, const IoHash&) {}));
}
TEST_CASE("project.store.iterateoplog")
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 49403d39c..216b1c4dd 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -143,13 +143,7 @@ namespace remotestore_impl {
NiceBytes(Stats.m_PeakReceivedBytes));
}
- struct Block
- {
- IoHash BlockHash;
- std::vector<IoHash> ChunksInBlock;
- };
-
- size_t AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
+ size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks)
{
size_t BlockIndex;
{
@@ -741,14 +735,14 @@ namespace remotestore_impl {
});
};
- void CreateBlock(WorkerThreadPool& WorkerPool,
- Latch& OpSectionsLatch,
- std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
- RwLock& SectionsLock,
- std::vector<Block>& Blocks,
- size_t BlockIndex,
- const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
- AsyncRemoteResult& RemoteResult)
+ void CreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<RemoteProjectStore::Block>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
{
OpSectionsLatch.AddCount(1);
WorkerPool.ScheduleWork([&Blocks,
@@ -767,16 +761,17 @@ namespace remotestore_impl {
try
{
ZEN_ASSERT(ChunkCount > 0);
- Stopwatch Timer;
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks));
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ Stopwatch Timer;
+ RemoteProjectStore::Block Block;
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block);
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
{
// We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex].BlockHash = BlockHash;
+ Blocks[BlockIndex] = Block;
}
uint64_t BlockSize = CompressedBlock.GetCompressedSize();
- AsyncOnBlock(std::move(CompressedBlock), BlockHash);
+ AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
ZEN_INFO("Generated block with {} attachments in {} ({})",
ChunkCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
@@ -800,12 +795,18 @@ namespace remotestore_impl {
std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0;
};
+ struct CreatedBlock
+ {
+ IoBuffer Payload;
+ RemoteProjectStore::Block Block;
+ };
+
void UploadAttachments(WorkerThreadPool& WorkerPool,
CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks,
- const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
+ const std::unordered_map<IoHash, CreatedBlock, IoHash::Hasher>& CreatedBlocks,
const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
bool ForceAll,
@@ -927,12 +928,12 @@ namespace remotestore_impl {
}
try
{
- bool IsBlock = false;
- IoBuffer Payload;
+ IoBuffer Payload;
+ RemoteProjectStore::Block Block;
if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
{
- Payload = BlockIt->second;
- IsBlock = true;
+ Payload = BlockIt->second.Payload;
+ Block = BlockIt->second.Block;
}
else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
{
@@ -953,9 +954,10 @@ namespace remotestore_impl {
RemoteResult.GetErrorReason());
return;
}
+ const bool IsBlock = Block.BlockHash == RawHash;
size_t PayloadSize = Payload.GetSize();
RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block));
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
@@ -1176,13 +1178,67 @@ IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuff
return true;
};
+std::vector<IoHash>
+GetBlockHashesFromOplog(CbObjectView ContainerObject)
+{
+ using namespace std::literals;
+ std::vector<RemoteProjectStore::Block> Result;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+
+ std::vector<IoHash> BlockHashes;
+ BlockHashes.reserve(BlocksArray.Num());
+ for (CbFieldView BlockField : BlocksArray)
+ {
+ CbObjectView BlockView = BlockField.AsObjectView();
+ IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
+ BlockHashes.push_back(BlockHash);
+ }
+ return BlockHashes;
+}
+
+std::vector<RemoteProjectStore::Block>
+GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes)
+{
+ using namespace std::literals;
+ std::vector<RemoteProjectStore::Block> Result;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet;
+ IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end());
+
+ Result.reserve(IncludeBlockHashes.size());
+ for (CbFieldView BlockField : BlocksArray)
+ {
+ CbObjectView BlockView = BlockField.AsObjectView();
+ IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
+ if (IncludeSet.contains(BlockHash))
+ {
+ std::vector<IoHash> ChunkHashes;
+ CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
+ if (BlockHash == IoHash::Zero)
+ {
+ continue;
+ }
+ ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ ChunkHashes.push_back(ChunkField.AsHash());
+ }
+ Result.push_back({.BlockHash = BlockHash, .ChunkHashes = std::move(ChunkHashes)});
+ }
+ }
+ return Result;
+}
+
CompressedBuffer
-GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
+GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock)
{
+ const size_t ChunkCount = FetchChunks.size();
+
std::vector<SharedBuffer> ChunkSegments;
ChunkSegments.resize(1);
- ChunkSegments.reserve(1 + FetchChunks.size());
- size_t ChunkCount = FetchChunks.size();
+ ChunkSegments.reserve(1 + ChunkCount);
+ OutBlock.ChunkHashes.reserve(ChunkCount);
+ OutBlock.ChunkLengths.reserve(ChunkCount);
{
IoBuffer TempBuffer(ChunkCount * 9);
MutableMemoryView View = TempBuffer.GetMutableView();
@@ -1200,6 +1256,8 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
ChunkSegments.push_back(Segment);
}
BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr);
+ OutBlock.ChunkHashes.push_back(It.first);
+ OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize));
}
ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
@@ -1207,7 +1265,8 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
}
CompressedBuffer CompressedBlock =
CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
-
+ OutBlock.BlockHash = CompressedBlock.DecodeRawHash();
+ OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize());
return CompressedBlock;
}
@@ -1221,9 +1280,9 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::vector<remotestore_impl::Block>& KnownBlocks,
+ const std::vector<RemoteProjectStore::Block>& KnownBlocks,
WorkerThreadPool& WorkerPool,
- const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles,
@@ -1245,9 +1304,9 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
- RwLock BlocksLock;
- std::vector<remotestore_impl::Block> Blocks;
- CompressedBuffer OpsBuffer;
+ RwLock BlocksLock;
+ std::vector<RemoteProjectStore::Block> Blocks;
+ CompressedBuffer OpsBuffer;
std::filesystem::path AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
@@ -1463,7 +1522,7 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
- auto FindReuseBlocks = [](const std::vector<remotestore_impl::Block>& KnownBlocks,
+ auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks,
const std::unordered_set<IoHash, IoHash::Hasher>& Attachments,
JobContext* OptionalContext) -> std::vector<size_t> {
std::vector<size_t> ReuseBlockIndexes;
@@ -1476,14 +1535,14 @@ BuildContainer(CidStore& ChunkStore,
for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
{
- const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
+ const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size();
if (BlockAttachmentCount == 0)
{
continue;
}
size_t FoundAttachmentCount = 0;
- for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
+ for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (Attachments.contains(KnownHash))
{
@@ -1524,8 +1583,8 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
+ const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (UploadAttachments.erase(KnownHash) == 1)
{
@@ -1865,8 +1924,8 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
+ const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (ChunkedHashes.erase(KnownHash) == 1)
{
@@ -1978,7 +2037,7 @@ BuildContainer(CidStore& ChunkStore,
uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
auto NewBlock = [&]() {
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
size_t ChunkCount = ChunksInBlock.size();
if (BuildBlocks)
{
@@ -2000,9 +2059,9 @@ BuildContainer(CidStore& ChunkStore,
{
// We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
+ Blocks[BlockIndex].ChunkHashes.insert(Blocks[BlockIndex].ChunkHashes.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
}
uint64_t NowMS = Timer.GetElapsedTimeMs();
ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
@@ -2234,12 +2293,11 @@ BuildContainer(CidStore& ChunkStore,
CbObjectWriter OplogContinerWriter;
RwLock::SharedLockScope _(BlocksLock);
OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
-
OplogContinerWriter.BeginArray("blocks"sv);
{
- for (const remotestore_impl::Block& B : Blocks)
+ for (const RemoteProjectStore::Block& B : Blocks)
{
- ZEN_ASSERT(!B.ChunksInBlock.empty());
+ ZEN_ASSERT(!B.ChunkHashes.empty());
if (BuildBlocks)
{
ZEN_ASSERT(B.BlockHash != IoHash::Zero);
@@ -2249,7 +2307,7 @@ BuildContainer(CidStore& ChunkStore,
OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash);
OplogContinerWriter.BeginArray("chunks"sv);
{
- for (const IoHash& RawHash : B.ChunksInBlock)
+ for (const IoHash& RawHash : B.ChunkHashes)
{
OplogContinerWriter.AddHash(RawHash);
}
@@ -2265,7 +2323,7 @@ BuildContainer(CidStore& ChunkStore,
{
OplogContinerWriter.BeginArray("chunks"sv);
{
- for (const IoHash& RawHash : B.ChunksInBlock)
+ for (const IoHash& RawHash : B.ChunkHashes)
{
OplogContinerWriter.AddBinaryAttachment(RawHash);
}
@@ -2331,7 +2389,7 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
@@ -2391,21 +2449,21 @@ SaveOplog(CidStore& ChunkStore,
CreateDirectories(AttachmentTempPath);
}
- remotestore_impl::AsyncRemoteResult RemoteResult;
- RwLock AttachmentsLock;
- std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
- tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
+ remotestore_impl::AsyncRemoteResult RemoteResult;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
+ std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks;
+ tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
- auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
- const IoHash& BlockHash) {
+ auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
+ RemoteProjectStore::Block&& Block) {
std::filesystem::path BlockPath = AttachmentTempPath;
- BlockPath.append(BlockHash.ToHexString());
+ BlockPath.append(Block.BlockHash.ToHexString());
try
{
IoBuffer BlockBuffer = remotestore_impl::WriteToTempFile(std::move(CompressedBlock), BlockPath);
RwLock::ExclusiveLockScope __(AttachmentsLock);
- CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
+ CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}});
ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize()));
}
catch (const std::exception& Ex)
@@ -2417,8 +2475,11 @@ SaveOplog(CidStore& ChunkStore,
}
};
- auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
- RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash);
+ auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
+ RemoteProjectStore::Block&& Block) {
+ IoHash BlockHash = Block.BlockHash;
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block));
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
@@ -2448,7 +2509,7 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Found attachment {}", AttachmentHash);
};
- std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock;
+ std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock;
if (RemoteStoreInfo.UseTempBlockFiles)
{
OnBlock = MakeTempBlock;
@@ -2458,95 +2519,52 @@ SaveOplog(CidStore& ChunkStore,
OnBlock = UploadBlock;
}
- std::vector<remotestore_impl::Block> KnownBlocks;
+ std::vector<RemoteProjectStore::Block> KnownBlocks;
uint64_t TransferWallTimeMS = 0;
- if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty())
+ RemoteProjectStore::CreateContainerResult ContainerResult = RemoteStore.CreateContainer();
+ if (ContainerResult.ErrorCode)
{
+ RemoteProjectStore::Result Result = {.ErrorCode = ContainerResult.ErrorCode,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Text = fmt::format("Failed to create container for oplog '{}' ({}): {}",
+ RemoteStoreInfo.ContainerName,
+ ContainerResult.ErrorCode,
+ ContainerResult.Reason)};
remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName));
- Stopwatch LoadBaseContainerTimer;
- RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer();
- TransferWallTimeMS += LoadBaseContainerTimer.GetElapsedTimeMs();
-
- if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent))
- {
- if (BaseContainerResult.ErrorCode)
- {
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments",
- RemoteStoreInfo.BaseContainerName,
- BaseContainerResult.ErrorCode,
- BaseContainerResult.Reason));
- }
- else
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Loaded oplog base container in {}",
- NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0))));
-
- CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView();
-
- std::vector<IoHash> BlockHashes;
- BlockHashes.reserve(BlocksArray.Num());
- for (CbFieldView BlockField : BlocksArray)
- {
- CbObjectView BlockView = BlockField.AsObjectView();
- IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
- BlockHashes.push_back(BlockHash);
- }
-
- RemoteProjectStore::HasAttachmentsResult HasResult = RemoteStore.HasAttachments(BlockHashes);
- if (HasResult.ErrorCode == 0)
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}",
- BlockHashes.size(),
- BlockHashes.size() > 1 ? "s"sv : ""sv,
- BlockHashes.size() - HasResult.Needs.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0))));
- if (HasResult.Needs.size() < BlocksArray.Num())
- {
- KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size());
-
- const std::unordered_set<IoHash, IoHash::Hasher> MissingBlocks(HasResult.Needs);
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return Result;
+ }
- for (CbFieldView BlockField : BlocksArray)
- {
- CbObjectView BlockView = BlockField.AsObjectView();
- IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
- if (!MissingBlocks.contains(BlockHash))
- {
- std::vector<IoHash> ChunksInBlock;
- CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
- if (BlockHash == IoHash::Zero)
- {
- continue;
- }
+ if (RemoteStoreInfo.CreateBlocks)
+ {
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching known blocks from '{}'", RemoteStoreInfo.Description));
+ Stopwatch GetKnownBlocksTimer;
+ RemoteProjectStore::GetKnownBlocksResult KnownBlocksResult = RemoteStore.GetKnownBlocks();
+ TransferWallTimeMS += GetKnownBlocksTimer.GetElapsedTimeMs();
- ChunksInBlock.reserve(ChunksArray.Num());
- for (CbFieldView ChunkField : ChunksArray)
- {
- ChunksInBlock.push_back(ChunkField.AsHash());
- }
- KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)});
- }
- }
- }
- }
- else
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none "
- "does: '{}', error code : {}",
- HasResult.Reason,
- HasResult.ErrorCode));
- }
- }
+ if (KnownBlocksResult.ErrorCode == static_cast<int>(HttpResponseCode::NoContent))
+ {
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("No known blocks in '{}', uploading all attachments", RemoteStoreInfo.Description));
+ }
+ else if (KnownBlocksResult.ErrorCode)
+ {
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Failed to get known blocks from '{}' ({}): {}, uploading all attachments",
+ RemoteStoreInfo.Description,
+ KnownBlocksResult.ErrorCode,
+ KnownBlocksResult.Reason));
+ }
+ else
+ {
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Fetched {} known blocks from '{}' in {}",
+ KnownBlocksResult.Blocks.size(),
+ RemoteStoreInfo.Description,
+ NiceTimeSpanMs(static_cast<uint64_t>(KnownBlocksResult.ElapsedSeconds * 1000.0))));
+ KnownBlocks = std::move(KnownBlocksResult.Blocks);
}
}
@@ -2588,15 +2606,22 @@ SaveOplog(CidStore& ChunkStore,
RemoteStoreInfo.ContainerName,
ChunkCount,
BlockCount));
- Stopwatch SaveContainerTimer;
- RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
+ Stopwatch SaveContainerTimer;
+ IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer();
+ ContainerPayload.SetContentType(ZenContentType::kCbObject);
+ RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload));
TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs();
if (ContainerSaveResult.ErrorCode)
{
RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
+ RemoteProjectStore::Result Result = {
+ .ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Text = fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())};
remotestore_impl::ReportMessage(
OptionalContext,
fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return Result;
}
else
{
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index f0655fb59..e05cb9923 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -16,6 +16,14 @@ struct ChunkedInfo;
class RemoteProjectStore
{
public:
+ struct Block
+ {
+ IoHash BlockHash;
+ std::vector<IoHash> ChunkHashes;
+ std::vector<uint32_t> ChunkLengths;
+ uint32_t FirstChunkOffset = (uint32_t)-1;
+ };
+
struct Result
{
int32_t ErrorCode{};
@@ -24,6 +32,10 @@ public:
std::string Text;
};
+ struct CreateContainerResult : public Result
+ {
+ };
+
struct SaveResult : public Result
{
std::unordered_set<IoHash, IoHash::Hasher> Needs;
@@ -43,11 +55,6 @@ public:
{
};
- struct HasAttachmentsResult : public Result
- {
- std::unordered_set<IoHash, IoHash::Hasher> Needs;
- };
-
struct LoadAttachmentResult : public Result
{
IoBuffer Bytes;
@@ -63,13 +70,17 @@ public:
std::vector<std::pair<IoHash, CompressedBuffer>> Chunks;
};
+ struct GetKnownBlocksResult : public Result
+ {
+ std::vector<Block> Blocks;
+ };
+
struct RemoteStoreInfo
{
bool CreateBlocks;
bool UseTempBlockFiles;
bool AllowChunking;
std::string ContainerName;
- std::string BaseContainerName;
std::string Description;
};
@@ -90,15 +101,15 @@ public:
virtual RemoteStoreInfo GetInfo() const = 0;
virtual Stats GetStats() const = 0;
- virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0;
- virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
- virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
+ virtual CreateContainerResult CreateContainer() = 0;
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) = 0;
+ virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
virtual LoadContainerResult LoadContainer() = 0;
- virtual LoadContainerResult LoadBaseContainer() = 0;
+ virtual GetKnownBlocksResult GetKnownBlocks() = 0;
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0;
- virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) = 0;
virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0;
};
@@ -126,7 +137,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles);
@@ -162,7 +173,9 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
bool CleanOplog,
JobContext* OptionalContext);
-CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks);
+CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock);
bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject);
+std::vector<RemoteProjectStore::Block> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes);
} // namespace zen
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index 6b05442b3..566d0d4b2 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -36,7 +36,6 @@ public:
.UseTempBlockFiles = false,
.AllowChunking = false,
.ContainerName = fmt::format("{}/{}", m_Project, m_Oplog),
- .BaseContainerName = "",
.Description = fmt::format("[zen] {}"sv, m_HostAddress)};
}
@@ -51,6 +50,12 @@ public:
.m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
}
+ virtual CreateContainerResult CreateContainer() override
+ {
+ // Nothing to do here
+ return {};
+ }
+
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog);
@@ -88,7 +93,7 @@ public:
return Result;
}
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override
{
std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary);
@@ -231,16 +236,9 @@ public:
return Result;
}
- virtual LoadContainerResult LoadBaseContainer() override
- {
- return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
- }
-
- virtual HasAttachmentsResult HasAttachments(const std::span<IoHash>) override
+ virtual GetKnownBlocksResult GetKnownBlocks() override
{
- // For zen as remote store we never store blocks so we should never get here
- ZEN_ASSERT(false);
- return HasAttachmentsResult{};
+ return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
}
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
index 3e477f053..2ae977f00 100644
--- a/src/zenserver/upstream/jupiter.cpp
+++ b/src/zenserver/upstream/jupiter.cpp
@@ -378,6 +378,157 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
return Result;
}
+CloudCacheResult
+CloudCacheSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload);
+ return detail::ConvertResponse(Response, "CloudCacheSession::PutBuild"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetBuild"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId));
+ return detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuild"sv);
+}
+
+PutBuildPartResult
+CloudCacheSession::PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+
+ IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName),
+ Payload,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::PutBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+CloudCacheResult
+CloudCacheSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildPart"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ Payload,
+ ContentType);
+ return detail::ConvertResponse(Response, "CloudCacheSession::PutBuildBlob"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Download(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ TempFolderPath);
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildBlob"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ Payload);
+ return detail::ConvertResponse(Response, "CloudCacheSession::PutBlockMetadata"sv);
+}
+
+FinalizeBuildPartResult
+CloudCacheSession::FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()),
+ HttpClient::Accept(ZenContentType::kCbObject));
+
+ FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+CloudCacheResult
+CloudCacheSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "CloudCacheSession::FindBlocks"sv);
+}
+
/**
* An access token provider that holds a token that will never change.
*/
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h
index 00ba55bad..50e4ad68a 100644
--- a/src/zenserver/upstream/jupiter.h
+++ b/src/zenserver/upstream/jupiter.h
@@ -81,6 +81,17 @@ struct GetObjectReferencesResult : CloudCacheResult
std::set<IoHash> References;
};
+struct PutBuildPartResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeBuildPartResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -95,6 +106,7 @@ public:
~CloudCacheSession();
CloudCacheResult Authenticate();
+
CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
@@ -127,6 +139,42 @@ public:
std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+ CloudCacheResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
+ CloudCacheResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ CloudCacheResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ PutBuildPartResult PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload);
+ CloudCacheResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ CloudCacheResult PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload);
+ CloudCacheResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath);
+ CloudCacheResult PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ const IoBuffer& Payload);
+ FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash);
+ CloudCacheResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+
CloudCacheClient& Client() { return *m_CacheClient; };
private:
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 851b1d125..9f09713ee 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1457,7 +1457,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (SetMetaInfo)
{
// See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
- // metadata separately, check if it had time to set the metdata
+ // metadata separately, check if it had time to set the metadata
RwLock::SharedLockScope UpdateIndexLock(m_IndexLock);
if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
{
@@ -1578,7 +1578,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
{
// See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
- // metadata separately, check if it had time to set the metdata
+ // metadata separately, check if it had time to set the metadata
RwLock::SharedLockScope UpdateIndexLock(m_IndexLock);
if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
{