diff options
| author | Dan Engelbrecht <[email protected]> | 2024-12-12 08:27:54 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-12-12 08:27:54 +0100 |
| commit | 9bb2bf10a76127fea1db01fab42c795bdc07c936 (patch) | |
| tree | 4bdb9d40ee265798afe4ec439dea45b7a7c5ed3c /src | |
| parent | Memory tracking improvements (#262) (diff) | |
| download | zen-9bb2bf10a76127fea1db01fab42c795bdc07c936.tar.xz zen-9bb2bf10a76127fea1db01fab42c795bdc07c936.zip | |
Builds API remote project store (#258)
Feature: zen command oplog-export and oplog-import now supports --builds remote target using the Jupiter builds API
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 339 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.h | 43 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 561 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.h | 29 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.cpp | 52 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 90 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 74 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 323 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 41 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.cpp | 20 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 151 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.h | 48 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 4 |
13 files changed, 1465 insertions, 310 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 1e4f2675a..5e18a3624 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -29,7 +29,7 @@ namespace { using namespace std::literals; - const std::string DefaultCloudAccessTokenEnvVariableName( + const std::string DefaultJupiterAccessTokenEnvVariableName( #if ZEN_PLATFORM_WINDOWS "UE-CloudDataCacheAccessToken"sv #endif @@ -39,7 +39,7 @@ namespace { ); - std::string ReadCloudAccessTokenFromFile(const std::filesystem::path& Path) + std::string ReadJupiterAccessTokenFromFile(const std::filesystem::path& Path) { if (!std::filesystem::is_regular_file(Path)) { @@ -839,43 +839,66 @@ ExportOplogCommand::ExportOplogCommand() "<disable>"); m_Options.add_option("", "a", "async", "Trigger export but don't wait for completion", cxxopts::value(m_Async), "<async>"); - m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); - m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); - m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>"); - m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>"); - m_Options.add_option("cloud", + m_Options.add_option("", "", "namespace", "Cloud/Builds Storage namespace", cxxopts::value(m_JupiterNamespace), "<namespace>"); + m_Options.add_option("", "", "bucket", "Cloud/Builds Storage bucket", cxxopts::value(m_JupiterBucket), "<bucket>"); + m_Options.add_option("", "", - "basekey", - "Optional Base Cloud Storage key for incremental export", - cxxopts::value(m_BaseCloudKey), - "<key>"); + "openid-provider", + "Cloud/Builds Storage openid provider", + cxxopts::value(m_JupiterOpenIdProvider), + "<provider>"); m_Options - .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>"); - m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>"); - m_Options.add_option("cloud", + .add_option("", "", "access-token", "Cloud/Builds Storage access token", cxxopts::value(m_JupiterAccessToken), "<accesstoken>"); + m_Options.add_option("", "", "access-token-env", - "Name of environment variable that holds the cloud Storage access token", - cxxopts::value(m_CloudAccessTokenEnv)->default_value(DefaultCloudAccessTokenEnvVariableName), + "Name of environment variable that holds the cloud/builds Storage access token", + cxxopts::value(m_JupiterAccessTokenEnv)->default_value(DefaultJupiterAccessTokenEnvVariableName), "<envvariable>"); - m_Options.add_option("cloud", + m_Options.add_option("", "", "access-token-path", - "Path to json file that holds the cloud Storage access token", - cxxopts::value(m_CloudAccessTokenPath), + "Path to json file that holds the cloud/builds Storage access token", + cxxopts::value(m_JupiterAccessTokenPath), "<filepath>"); - m_Options.add_option("cloud", + m_Options.add_option("", "", "assume-http2", - "Assume that the cloud endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake", - cxxopts::value(m_CloudAssumeHttp2), + "Assume that the cloud/builds endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake", + cxxopts::value(m_JupiterAssumeHttp2), "<assumehttp2>"); + m_Options.add_option( + "", + "", + "disabletempblocks", + "Disable temp block creation and upload blocks without waiting for oplog container to be uploaded for cloud/builds storage", + cxxopts::value(m_JupiterDisableTempBlocks), + "<disable>"); + + m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); + m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>"); m_Options.add_option("cloud", "", - "disabletempblocks", - "Disable temp block creation and upload blocks without waiting for oplog container to be uploaded", - cxxopts::value(m_CloudDisableTempBlocks), - "<disable>"); + "basekey", + "Optional Base Cloud Storage key for incremental export", + cxxopts::value(m_BaseCloudKey), + "<key>"); + + m_Options.add_option("", "", "builds", "Builds Storage API URL", cxxopts::value(m_BuildsUrl), "<url>"); + m_Options.add_option("builds", "", "builds-id", "Builds Id", cxxopts::value(m_BuildsId), "<id>"); + + m_Options.add_option("builds", + "", + "builds-metadata-path", + "Path to json file that holds the metadata for the build", + cxxopts::value(m_BuildsMetadataPath), + "<metadata-path>"); + m_Options.add_option("builds", + "", + "builds-metadata", + "Key-value pairs separated by ';' with build meta data. (key1=value1;key2=value2)", + cxxopts::value(m_BuildsMetadata), + "<metadata>"); m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>"); m_Options.add_option("zen", "", "target-project", "Zen target project name", cxxopts::value(m_ZenProjectName), "<targetprojectid>"); @@ -945,6 +968,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg size_t TargetCount = 0; TargetCount += m_CloudUrl.empty() ? 0 : 1; + TargetCount += m_BuildsUrl.empty() ? 0 : 1; TargetCount += m_ZenUrl.empty() ? 0 : 1; TargetCount += m_FileDirectoryPath.empty() ? 0 : 1; if (TargetCount != 1) @@ -961,7 +985,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg if (!m_CloudUrl.empty()) { - if (m_CloudNamespace.empty() || m_CloudBucket.empty()) + if (m_JupiterNamespace.empty() || m_JupiterBucket.empty()) { ZEN_ERROR("Options for cloud target are missing"); ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str()); @@ -969,13 +993,40 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg } if (m_CloudKey.empty()) { - std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket); + std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_JupiterNamespace, m_JupiterBucket); IoHash Key = IoHash::HashBuffer(KeyString.data(), KeyString.size()); m_CloudKey = Key.ToHexString(); ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey); } } + if (!m_BuildsUrl.empty()) + { + if (m_JupiterNamespace.empty() || m_JupiterBucket.empty()) + { + ZEN_ERROR("Options for builds target are missing"); + ZEN_CONSOLE("{}", m_Options.help({"builds"}).c_str()); + return 1; + } + if (m_BuildsMetadataPath.empty() && m_BuildsMetadata.empty()) + { + ZEN_ERROR("Options for builds target are missing"); + ZEN_CONSOLE("{}", m_Options.help({"builds"}).c_str()); + return 1; + } + if (!m_BuildsMetadataPath.empty() && !m_BuildsMetadata.empty()) + { + ZEN_ERROR("Conflicting options for builds target"); + ZEN_CONSOLE("{}", m_Options.help({"builds"}).c_str()); + return 1; + } + if (m_BuildsId.empty()) + { + m_BuildsId = Oid::NewOid().ToString(); + ZEN_CONSOLE("Using generated builds id: {}", m_BuildsId); + } + } + if (!m_ZenUrl.empty()) { if (m_ZenProjectName.empty()) @@ -1101,32 +1152,32 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg Writer.BeginObject("cloud"sv); { Writer.AddString("url"sv, m_CloudUrl); - Writer.AddString("namespace"sv, m_CloudNamespace); - Writer.AddString("bucket"sv, m_CloudBucket); + Writer.AddString("namespace"sv, m_JupiterNamespace); + Writer.AddString("bucket"sv, m_JupiterBucket); Writer.AddString("key"sv, m_CloudKey); if (!m_BaseCloudKey.empty()) { Writer.AddString("basekey"sv, m_BaseCloudKey); } - if (!m_CloudOpenIdProvider.empty()) + if (!m_JupiterOpenIdProvider.empty()) { - Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider); + Writer.AddString("openid-provider"sv, m_JupiterOpenIdProvider); } - if (!m_CloudAccessToken.empty()) + if (!m_JupiterAccessToken.empty()) { - Writer.AddString("access-token"sv, m_CloudAccessToken); + Writer.AddString("access-token"sv, m_JupiterAccessToken); } - if (!m_CloudAccessTokenPath.empty()) + if (!m_JupiterAccessTokenPath.empty()) { - std::string ResolvedCloudAccessToken = ReadCloudAccessTokenFromFile(m_CloudAccessTokenPath); + std::string ResolvedCloudAccessToken = ReadJupiterAccessTokenFromFile(m_JupiterAccessTokenPath); if (!ResolvedCloudAccessToken.empty()) { Writer.AddString("access-token"sv, ResolvedCloudAccessToken); } } - if (!m_CloudAccessTokenEnv.empty()) + if (!m_JupiterAccessTokenEnv.empty()) { - std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_CloudAccessTokenEnv); + std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_JupiterAccessTokenEnv); if (!ResolvedCloudAccessTokenEnv.empty()) { @@ -1134,10 +1185,10 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg } else { - Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv); + Writer.AddString("access-token-env"sv, m_JupiterAccessTokenEnv); } } - if (m_CloudAssumeHttp2) + if (m_JupiterAssumeHttp2) { Writer.AddBool("assumehttp2"sv, true); } @@ -1145,7 +1196,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("disableblocks"sv, true); } - if (m_CloudDisableTempBlocks) + if (m_JupiterDisableTempBlocks) { Writer.AddBool("disabletempblocks"sv, true); } @@ -1153,12 +1204,89 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg Writer.EndObject(); // "cloud" TargetDescription = fmt::format("[cloud] {}/{}/{}/{}{}{}", m_CloudUrl, - m_CloudNamespace, - m_CloudBucket, + m_JupiterNamespace, + m_JupiterBucket, m_CloudKey, m_BaseCloudKey.empty() ? "" : " Base: ", m_BaseCloudKey); } + if (!m_BuildsUrl.empty()) + { + Writer.BeginObject("builds"sv); + { + Writer.AddString("url"sv, m_BuildsUrl); + Writer.AddString("namespace"sv, m_JupiterNamespace); + Writer.AddString("bucket"sv, m_JupiterBucket); + Writer.AddString("buildsid"sv, m_BuildsId); + if (!m_JupiterOpenIdProvider.empty()) + { + Writer.AddString("openid-provider"sv, m_JupiterOpenIdProvider); + } + if (!m_JupiterAccessToken.empty()) + { + Writer.AddString("access-token"sv, m_JupiterAccessToken); + } + if (!m_JupiterAccessTokenPath.empty()) + { + std::string ResolvedCloudAccessToken = ReadJupiterAccessTokenFromFile(m_JupiterAccessTokenPath); + if (!ResolvedCloudAccessToken.empty()) + { + Writer.AddString("access-token"sv, ResolvedCloudAccessToken); + } + } + if (!m_JupiterAccessTokenEnv.empty()) + { + std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_JupiterAccessTokenEnv); + + if (!ResolvedCloudAccessTokenEnv.empty()) + { + Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv); + } + else + { + Writer.AddString("access-token-env"sv, m_JupiterAccessTokenEnv); + } + } + if (m_JupiterAssumeHttp2) + { + Writer.AddBool("assumehttp2"sv, true); + } + if (m_DisableBlocks) + { + Writer.AddBool("disableblocks"sv, true); + } + if (m_JupiterDisableTempBlocks) + { + Writer.AddBool("disabletempblocks"sv, true); + } + + if (!m_BuildsMetadataPath.empty()) + { + std::filesystem::path MetadataPath(m_BuildsMetadataPath); + IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten(); + std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize()); + CbFieldIterator MetaData = LoadCompactBinaryFromJson(Json); + Writer.AddBinary("metadata"sv, MetaData.GetBuffer()); + } + if (!m_BuildsMetadata.empty()) + { + CbObjectWriter MetaDataWriter(m_BuildsMetadata.length()); + ForEachStrTok(m_BuildsMetadata, ';', [&](std::string_view Pair) { + size_t SplitPos = Pair.find('='); + if (SplitPos == std::string::npos || SplitPos == 0) + { + throw std::runtime_error(fmt::format("builds metadata key-value pair '{}' is malformed", Pair)); + } + MetaDataWriter.AddString(Pair.substr(0, SplitPos), Pair.substr(SplitPos + 1)); + return true; + }); + CbObject MetaData = MetaDataWriter.Save(); + Writer.AddBinary("metadata"sv, MetaData.GetBuffer()); + } + } + Writer.EndObject(); // "builds" + TargetDescription = fmt::format("[builds] {}/{}/{}/{}", m_CloudUrl, m_JupiterNamespace, m_JupiterBucket, m_BuildsId); + } if (!m_ZenUrl.empty()) { Writer.BeginObject("zen"sv); @@ -1230,32 +1358,41 @@ ImportOplogCommand::ImportOplogCommand() cxxopts::value(m_IgnoreMissingAttachments), "<ignore>"); - m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); - m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); - m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>"); - m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>"); + m_Options.add_option("", "", "namespace", "Cloud/Builds Storage namespace", cxxopts::value(m_JupiterNamespace), "<namespace>"); + m_Options.add_option("", "", "bucket", "Cloud/Builds Storage bucket", cxxopts::value(m_JupiterBucket), "<bucket>"); + m_Options.add_option("", + "", + "openid-provider", + "Cloud/Builds Storage openid provider", + cxxopts::value(m_JupiterOpenIdProvider), + "<provider>"); m_Options - .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>"); - m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>"); - m_Options.add_option("cloud", + .add_option("", "", "access-token", "Cloud/Builds Storage access token", cxxopts::value(m_JupiterAccessToken), "<accesstoken>"); + m_Options.add_option("", "", "access-token-env", - "Name of environment variable that holds the cloud Storage access token", - cxxopts::value(m_CloudAccessTokenEnv)->default_value(DefaultCloudAccessTokenEnvVariableName), + "Name of environment variable that holds the cloud/builds Storage access token", + cxxopts::value(m_JupiterAccessTokenEnv)->default_value(DefaultJupiterAccessTokenEnvVariableName), "<envvariable>"); - m_Options.add_option("cloud", + m_Options.add_option("", "", "access-token-path", - "Path to json file that holds the cloud Storage access token", - cxxopts::value(m_CloudAccessTokenPath), + "Path to json file that holds the cloud/builds Storage access token", + cxxopts::value(m_JupiterAccessTokenPath), "<filepath>"); - m_Options.add_option("cloud", + m_Options.add_option("", "", "assume-http2", - "Assume that the cloud endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake", - cxxopts::value(m_CloudAssumeHttp2), + "Assume that the cloud/builds endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake", + cxxopts::value(m_JupiterAssumeHttp2), "<assumehttp2>"); + m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); + m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>"); + + m_Options.add_option("", "", "builds", "Builds Storage URL", cxxopts::value(m_BuildsUrl), "<url>"); + m_Options.add_option("builds", "", "builds-id", "Builds Id", cxxopts::value(m_BuildsId), "<id>"); + m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>"); m_Options.add_option("zen", "", "source-project", "Zen source project name", cxxopts::value(m_ZenProjectName), "<sourceprojectid>"); m_Options.add_option("zen", "", "source-oplog", "Zen source oplog name", cxxopts::value(m_ZenOplogName), "<sourceoplogid>"); @@ -1319,6 +1456,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg size_t TargetCount = 0; TargetCount += m_CloudUrl.empty() ? 0 : 1; + TargetCount += m_BuildsUrl.empty() ? 0 : 1; TargetCount += m_ZenUrl.empty() ? 0 : 1; TargetCount += m_FileDirectoryPath.empty() ? 0 : 1; if (TargetCount != 1) @@ -1330,7 +1468,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg if (!m_CloudUrl.empty()) { - if (m_CloudNamespace.empty() || m_CloudBucket.empty()) + if (m_JupiterNamespace.empty() || m_JupiterBucket.empty()) { ZEN_ERROR("Options for cloud source are missing"); ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str()); @@ -1338,13 +1476,23 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg } if (m_CloudKey.empty()) { - std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket); + std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_JupiterNamespace, m_JupiterBucket); IoHash Key = IoHash::HashBuffer(KeyString.data(), KeyString.size()); m_CloudKey = Key.ToHexString(); ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey); } } + if (!m_BuildsUrl.empty()) + { + if (m_JupiterNamespace.empty() || m_JupiterBucket.empty() || m_BuildsId.empty()) + { + ZEN_ERROR("Options for builds source are missing"); + ZEN_CONSOLE("{}", m_Options.help({"builds"}).c_str()); + return 1; + } + } + if (!m_ZenUrl.empty()) { if (m_ZenProjectName.empty()) @@ -1434,28 +1582,28 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg Writer.BeginObject("cloud"sv); { Writer.AddString("url"sv, m_CloudUrl); - Writer.AddString("namespace"sv, m_CloudNamespace); - Writer.AddString("bucket"sv, m_CloudBucket); + Writer.AddString("namespace"sv, m_JupiterNamespace); + Writer.AddString("bucket"sv, m_JupiterBucket); Writer.AddString("key"sv, m_CloudKey); - if (!m_CloudOpenIdProvider.empty()) + if (!m_JupiterOpenIdProvider.empty()) { - Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider); + Writer.AddString("openid-provider"sv, m_JupiterOpenIdProvider); } - if (!m_CloudAccessToken.empty()) + if (!m_JupiterAccessToken.empty()) { - Writer.AddString("access-token"sv, m_CloudAccessToken); + Writer.AddString("access-token"sv, m_JupiterAccessToken); } - if (!m_CloudAccessTokenPath.empty()) + if (!m_JupiterAccessTokenPath.empty()) { - std::string ResolvedCloudAccessToken = ReadCloudAccessTokenFromFile(m_CloudAccessTokenPath); + std::string ResolvedCloudAccessToken = ReadJupiterAccessTokenFromFile(m_JupiterAccessTokenPath); if (!ResolvedCloudAccessToken.empty()) { Writer.AddString("access-token"sv, ResolvedCloudAccessToken); } } - if (!m_CloudAccessTokenEnv.empty()) + if (!m_JupiterAccessTokenEnv.empty()) { - std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_CloudAccessTokenEnv); + std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_JupiterAccessTokenEnv); if (!ResolvedCloudAccessTokenEnv.empty()) { @@ -1463,16 +1611,61 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg } else { - Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv); + Writer.AddString("access-token-env"sv, m_JupiterAccessTokenEnv); } } - if (m_CloudAssumeHttp2) + if (m_JupiterAssumeHttp2) { Writer.AddBool("assumehttp2"sv, true); } } Writer.EndObject(); // "cloud" - SourceDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey); + SourceDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_JupiterNamespace, m_JupiterBucket, m_CloudKey); + } + if (!m_BuildsUrl.empty()) + { + Writer.BeginObject("builds"sv); + { + Writer.AddString("url"sv, m_BuildsUrl); + Writer.AddString("namespace"sv, m_JupiterNamespace); + Writer.AddString("bucket"sv, m_JupiterBucket); + Writer.AddString("buildsid"sv, m_BuildsId); + if (!m_JupiterOpenIdProvider.empty()) + { + Writer.AddString("openid-provider"sv, m_JupiterOpenIdProvider); + } + if (!m_JupiterAccessToken.empty()) + { + Writer.AddString("access-token"sv, m_JupiterAccessToken); + } + if (!m_JupiterAccessTokenPath.empty()) + { + std::string ResolvedCloudAccessToken = ReadJupiterAccessTokenFromFile(m_JupiterAccessTokenPath); + if (!ResolvedCloudAccessToken.empty()) + { + Writer.AddString("access-token"sv, ResolvedCloudAccessToken); + } + } + if (!m_JupiterAccessTokenEnv.empty()) + { + std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_JupiterAccessTokenEnv); + + if (!ResolvedCloudAccessTokenEnv.empty()) + { + Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv); + } + else + { + Writer.AddString("access-token-env"sv, m_JupiterAccessTokenEnv); + } + } + if (m_JupiterAssumeHttp2) + { + Writer.AddBool("assumehttp2"sv, true); + } + } + Writer.EndObject(); // "builds" + SourceDescription = fmt::format("[builds] {}/{}/{}/{}", m_CloudUrl, m_JupiterNamespace, m_JupiterBucket, m_BuildsId); } if (!m_ZenUrl.empty()) { diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h index 3a9d5dcb8..e66e98414 100644 --- a/src/zen/cmds/projectstore_cmd.h +++ b/src/zen/cmds/projectstore_cmd.h @@ -103,18 +103,25 @@ private: bool m_DisableBlocks = false; bool m_Async = false; + std::string m_JupiterNamespace; + std::string m_JupiterBucket; + std::string m_JupiterOpenIdProvider; + std::string m_JupiterAccessToken; + std::string m_JupiterAccessTokenEnv; + std::string m_JupiterAccessTokenPath; + bool m_JupiterAssumeHttp2 = false; + bool m_JupiterDisableTempBlocks = false; + std::string m_CloudUrl; - std::string m_CloudNamespace; - std::string m_CloudBucket; std::string m_CloudKey; std::string m_BaseCloudKey; - std::string m_CloudOpenIdProvider; - std::string m_CloudAccessToken; - std::string m_CloudAccessTokenEnv; - std::string m_CloudAccessTokenPath; - bool m_CloudAssumeHttp2 = false; - bool m_CloudDisableTempBlocks = false; - bool m_IgnoreMissingAttachments = false; + + std::string m_BuildsUrl; + std::string m_BuildsId; + std::string m_BuildsMetadataPath; + std::string m_BuildsMetadata; + + bool m_IgnoreMissingAttachments = false; std::string m_ZenUrl; std::string m_ZenProjectName; @@ -152,15 +159,19 @@ private: bool m_Clean = false; bool m_PlainProgress = false; + std::string m_JupiterNamespace; + std::string m_JupiterBucket; + std::string m_JupiterOpenIdProvider; + std::string m_JupiterAccessToken; + std::string m_JupiterAccessTokenEnv; + std::string m_JupiterAccessTokenPath; + bool m_JupiterAssumeHttp2 = false; + std::string m_CloudUrl; - std::string m_CloudNamespace; - std::string m_CloudBucket; std::string m_CloudKey; - std::string m_CloudOpenIdProvider; - std::string m_CloudAccessToken; - std::string m_CloudAccessTokenEnv; - std::string m_CloudAccessTokenPath; - bool m_CloudAssumeHttp2 = false; + + std::string m_BuildsUrl; + std::string m_BuildsId; std::string m_ZenUrl; std::string m_ZenProjectName; diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp new file mode 100644 index 000000000..6d0d51a60 --- /dev/null +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -0,0 +1,561 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "buildsremoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compress.h> +#include <zencore/fmtutils.h> + +#include <upstream/jupiter.h> +#include <zenhttp/auth/authmgr.h> + +namespace zen { + +using namespace std::literals; + +static const std::string_view OplogContainerPartName = "oplogcontainer"sv; + +class BuildsRemoteStore : public RemoteProjectStore +{ +public: + BuildsRemoteStore(Ref<CloudCacheClient>&& CloudClient, + std::string_view Namespace, + std::string_view Bucket, + const Oid& BuildId, + const IoBuffer& MetaData, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks, + const std::filesystem::path& TempFilePath) + : m_CloudClient(std::move(CloudClient)) + , m_Namespace(Namespace) + , m_Bucket(Bucket) + , m_BuildId(BuildId) + , m_MetaData(MetaData) + , m_TempFilePath(TempFilePath) + { + m_MetaData.MakeOwned(); + if (ForceDisableBlocks) + { + m_EnableBlocks = false; + } + if (ForceDisableTempBlocks) + { + m_UseTempBlocks = false; + } + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .AllowChunking = true, + .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), + .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)}; + } + + virtual Stats GetStats() const override + { + return {.m_SentBytes = m_SentBytes.load(), + .m_ReceivedBytes = m_ReceivedBytes.load(), + .m_RequestTimeNS = m_RequestTimeNS.load(), + .m_RequestCount = m_RequestCount.load(), + .m_PeakSentBytes = m_PeakSentBytes.load(), + .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), + .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; + } + + virtual CreateContainerResult CreateContainer() override + { + ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); + + CloudCacheSession Session(m_CloudClient.Get()); + + IoBuffer Payload = m_MetaData; + Payload.SetContentType(ZenContentType::kCbObject); + CloudCacheResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload); + AddStats(PutResult); + + CreateContainerResult Result{ConvertResult(PutResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + Result.Reason); + } + m_OplogBuildPartId = Oid::NewOid(); + return Result; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + CloudCacheSession Session(m_CloudClient.Get()); + PutBuildPartResult PutResult = + Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload); + AddStats(PutResult); + + SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Result.Reason); + } + + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override + { + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + CloudCacheSession Session(m_CloudClient.Get()); + + CloudCacheResult PutResult = + Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload); + AddStats(PutResult); + + SaveAttachmentResult Result{ConvertResult(PutResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + RawHash, + Result.Reason); + return Result; + } + + if (Block.BlockHash == RawHash) + { + ZEN_ASSERT(Block.ChunkLengths.size() == Block.ChunkHashes.size()); + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, RawHash); + Writer.BeginArray("rawHashes"sv); + { + for (const IoHash& ChunkHash : Block.ChunkHashes) + { + Writer.AddHash(ChunkHash); + } + } + Writer.EndArray(); + Writer.BeginArray("chunkLengths"); + { + for (uint32_t ChunkSize : Block.ChunkLengths) + { + Writer.AddInteger(ChunkSize); + } + } + Writer.EndArray(); + Writer.BeginArray("chunkOffsets"); + { + ZEN_ASSERT(Block.FirstChunkOffset != (uint32_t)-1); + uint32_t Offset = Block.FirstChunkOffset; + for (uint32_t ChunkSize : Block.ChunkLengths) + { + Writer.AddInteger(Offset); + Offset += ChunkSize; + } + } + Writer.EndArray(); + + Writer.BeginObject("metadata"sv); + { + Writer.AddString("createdBy", "zenserver"); + } + Writer.EndObject(); + + IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer(); + MetaPayload.SetContentType(ZenContentType::kCbObject); + CloudCacheResult PutMetaResult = + Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload); + AddStats(PutMetaResult); + RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); + if (MetaDataResult.ErrorCode) + { + ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + RawHash, + MetaDataResult.Reason); + } + } + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + SaveAttachmentsResult Result; + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); + if (ChunkResult.ErrorCode) + { + return SaveAttachmentsResult{ChunkResult}; + } + } + return Result; + } + + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override + { + ZEN_UNUSED(RawHash); + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + + CloudCacheSession Session(m_CloudClient.Get()); + FinalizeBuildPartResult FinalizeRefResult = + Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash); + AddStats(FinalizeRefResult); + + FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Result.Reason); + } + else if (Result.Needs.empty()) + { + CloudCacheResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId); + AddStats(FinalizeBuildResult); + FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds; + Result = {ConvertResult(FinalizeBuildResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + FinalizeBuildResult.Reason); + } + } + return Result; + } + + virtual LoadContainerResult LoadContainer() override + { + ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); + + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); + AddStats(GetBuildResult); + LoadContainerResult Result{ConvertResult(GetBuildResult)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + Result.Reason); + return Result; + } + CbObject BuildObject = LoadCompactBinaryObject(GetBuildResult.Response); + if (!BuildObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId); + return Result; + } + CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); + if (!PartsObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId); + return Result; + } + m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); + if (m_OplogBuildPartId == Oid::Zero) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + OplogContainerPartName); + return Result; + } + + CloudCacheResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + AddStats(GetBuildPartResult); + Result = {ConvertResult(GetBuildResult)}; + Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds; + if (Result.ErrorCode) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Result.Reason); + return Result; + } + + CbObject ContainerObject = LoadCompactBinaryObject(GetBuildPartResult.Response); + if (!ContainerObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId); + return Result; + } + Result.ContainerObject = std::move(ContainerObject); + return Result; + } + + virtual GetKnownBlocksResult GetKnownBlocks() override + { + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + AddStats(FindResult); + GetKnownBlocksResult Result{ConvertResult(FindResult)}; + if (Result.ErrorCode) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + Result.Reason); + return Result; + } + CbObject BlocksObject = LoadCompactBinaryObject(FindResult.Response); + if (!BlocksObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv, + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId); + return Result; + } + + CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); + Result.Blocks.reserve(Blocks.Num()); + for (CbFieldView BlockView : Blocks) + { + CbObjectView BlockObject = BlockView.AsObjectView(); + IoHash BlockHash = BlockObject["rawHash"sv].AsHash(); + if (BlockHash != IoHash::Zero) + { + CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView(); + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkView : ChunksArray) + { + ChunkHashes.push_back(ChunkView.AsHash()); + } + Result.Blocks.emplace_back(Block{.BlockHash = BlockHash, .ChunkHashes = ChunkHashes}); + } + } + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath); + AddStats(GetResult); + + LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; + if (GetResult.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId, + RawHash, + Result.Reason); + } + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + void AddStats(const CloudCacheResult& Result) + { + m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes)); + m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes)); + m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); + SetAtomicMax(m_PeakSentBytes, Result.SentBytes); + SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); + } + + m_RequestCount.fetch_add(1); + } + + static Result ConvertResult(const CloudCacheResult& Response) + { + std::string Text; + int32_t ErrorCode = 0; + if (Response.ErrorCode != 0 || !Response.Success) + { + if (Response.Response) + { + HttpContentType ContentType = Response.Response.GetContentType(); + if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON) + { + ExtendableStringBuilder<256> SB; + SB.Append("\n"); + SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), + Response.Response.GetSize())); + Text = SB.ToString(); + } + else if (ContentType == ZenContentType::kCbObject) + { + ExtendableStringBuilder<256> SB; + SB.Append("\n"); + CompactBinaryToJson(Response.Response.GetView(), SB); + Text = SB.ToString(); + } + } + } + if (Response.ErrorCode != 0) + { + ErrorCode = Response.ErrorCode; + } + else if (!Response.Success) + { + ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; + } + + Ref<CloudCacheClient> m_CloudClient; + const std::string m_Namespace; + const std::string m_Bucket; + const Oid m_BuildId; + IoBuffer m_MetaData; + Oid m_OplogBuildPartId = Oid::Zero; + std::filesystem::path m_TempFilePath; + bool m_EnableBlocks = true; + bool m_UseTempBlocks = true; + + std::atomic_uint64_t m_SentBytes = {}; + std::atomic_uint64_t m_ReceivedBytes = {}; + std::atomic_uint64_t m_RequestTimeNS = {}; + std::atomic_uint64_t m_RequestCount = {}; + std::atomic_uint64_t m_PeakSentBytes = {}; + std::atomic_uint64_t m_PeakReceivedBytes = {}; + std::atomic_uint64_t m_PeakBytesPerSec = {}; +}; + +std::shared_ptr<RemoteProjectStore> +CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("https://{}"sv, Url); + } + CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv, + .ServiceUrl = Url, + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(1800000), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 4}; + // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + // 2) Access token as parameter in request + // 3) Environment variable (different win vs linux/mac) + // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + if (!Options.OpenIdProvider.empty()) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else if (!Options.AccessToken.empty()) + { + TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() { + return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; + }); + } + else + { + TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default"); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + + Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); + + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(CloudClient), + Options.Namespace, + Options.Bucket, + Options.BuildId, + Options.MetaData, + Options.ForceDisableBlocks, + Options.ForceDisableTempBlocks, + TempFilePath); + return RemoteStore; +} + +} // namespace zen diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h new file mode 100644 index 000000000..8b2c6c8c8 --- /dev/null +++ b/src/zenserver/projectstore/buildsremoteprojectstore.h @@ -0,0 +1,29 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +class AuthMgr; + +struct BuildsRemoteStoreOptions : RemoteStoreOptions +{ + std::string Url; + std::string Namespace; + std::string Bucket; + Oid BuildId; + std::string OpenIdProvider; + std::string AccessToken; + AuthMgr& AuthManager; + bool ForceDisableBlocks = false; + bool ForceDisableTempBlocks = false; + bool AssumeHttp2 = false; + IoBuffer MetaData; +}; + +std::shared_ptr<RemoteProjectStore> CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, + const std::filesystem::path& TempFilePath); + +} // namespace zen diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index 7d6c43c1e..0fe739a12 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -41,7 +41,6 @@ public: .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = m_Name, - .BaseContainerName = m_OptionalBaseName, .Description = fmt::format("[file] {}/{}{}{}"sv, m_OutputPath, m_Name, m_OptionalBaseName.empty() ? "" : " Base: ", m_OptionalBaseName)}; } @@ -57,6 +56,12 @@ public: .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } + virtual CreateContainerResult CreateContainer() override + { + // Nothing to do here + return {}; + } + virtual SaveResult SaveContainer(const IoBuffer& Payload) override { Stopwatch Timer; @@ -101,7 +106,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override { Stopwatch Timer; SaveAttachmentResult Result; @@ -139,7 +144,7 @@ public: for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); - SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); if (ChunkResult.ErrorCode) { ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; @@ -154,29 +159,42 @@ public: virtual FinalizeResult FinalizeContainer(const IoHash&) override { return {}; } virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Name); } - virtual LoadContainerResult LoadBaseContainer() override + + virtual GetKnownBlocksResult GetKnownBlocks() override { if (m_OptionalBaseName.empty()) { - return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; } - return LoadContainer(m_OptionalBaseName); - } - - virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override - { - Stopwatch Timer; - HasAttachmentsResult Result; - for (const IoHash& RawHash : RawHashes) + LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseName); + if (LoadResult.ErrorCode) + { + return GetKnownBlocksResult{LoadResult}; + } + Stopwatch Timer; + std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); + if (BlockHashes.empty()) + { + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), + .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; + } + std::vector<IoHash> ExistingBlockHashes; + for (const IoHash& RawHash : BlockHashes) { std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!std::filesystem::is_regular_file(ChunkPath)) + if (std::filesystem::is_regular_file(ChunkPath)) { - Result.Needs.insert(RawHash); + ExistingBlockHashes.push_back(RawHash); } } - AddStats(0, 0, Timer.GetElapsedTimeUs() * 1000); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + if (ExistingBlockHashes.empty()) + { + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), + .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; + } + std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; + Result.Blocks = std::move(KnownBlocks); return Result; } diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index d926499c4..f4fe578ff 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -46,7 +46,6 @@ public: .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), - .BaseContainerName = m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), .Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, @@ -66,6 +65,12 @@ public: .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } + virtual CreateContainerResult CreateContainer() override + { + // Nothing to do here + return {}; + } + virtual SaveResult SaveContainer(const IoBuffer& Payload) override { CloudCacheSession Session(m_CloudClient.Get()); @@ -85,7 +90,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override { CloudCacheSession Session(m_CloudClient.Get()); CloudCacheResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); @@ -109,7 +114,7 @@ public: for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); - SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); if (ChunkResult.ErrorCode) { return SaveAttachmentsResult{ChunkResult}; @@ -139,31 +144,58 @@ public: virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Key); } - virtual LoadContainerResult LoadBaseContainer() override + virtual GetKnownBlocksResult GetKnownBlocks() override { if (m_OptionalBaseKey == IoHash::Zero) { - return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; + } + LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseKey); + if (LoadResult.ErrorCode) + { + return GetKnownBlocksResult{LoadResult}; + } + std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); + if (BlockHashes.empty()) + { + return GetKnownBlocksResult{ + {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}}; } - return LoadContainer(m_OptionalBaseKey); - } - virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override - { CloudCacheSession Session(m_CloudClient.Get()); CloudCacheExistsResult ExistsResult = - Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(RawHashes.begin(), RawHashes.end())); + Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end())); AddStats(ExistsResult); - HasAttachmentsResult Result{ConvertResult(ExistsResult), - std::unordered_set<IoHash, IoHash::Hasher>(ExistsResult.Needs.begin(), ExistsResult.Needs.end())}; if (ExistsResult.ErrorCode) { - Result.Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), - m_Namespace, - Result.Reason); + return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode, + .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds, + .Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'", + m_CloudClient->ServiceUrl(), + m_Namespace, + ExistsResult.Reason)}}; + } + + Stopwatch Timer; + std::vector<IoHash> ExistingBlockHashes; + for (const IoHash& RawHash : BlockHashes) + { + if (!ExistsResult.Needs.contains(RawHash)) + { + ExistingBlockHashes.push_back(RawHash); + } } + if (ExistingBlockHashes.empty()) + { + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), + .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}}; + } + std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + + GetKnownBlocksResult Result{ + {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}}; + Result.Blocks = std::move(KnownBlocks); return Result; } @@ -323,12 +355,21 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi .AssumeHttp2 = Options.AssumeHttp2, .AllowResume = true, .RetryCount = 4}; - // 1) Access token as parameter in request - // 2) Environment variable (different win vs linux/mac) - // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + // 2) Access token as parameter in request + // 3) Environment variable (different win vs linux/mac) + // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider std::unique_ptr<CloudCacheTokenProvider> TokenProvider; - if (!Options.AccessToken.empty()) + if (!Options.OpenIdProvider.empty()) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else if (!Options.AccessToken.empty()) { TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() { return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; @@ -336,11 +377,10 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi } else { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { - AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); + TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default"); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); } Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index f25042a62..63a80fbd8 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -24,6 +24,7 @@ #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> +#include "buildsremoteprojectstore.h" #include "fileremoteprojectstore.h" #include "jupiterremoteprojectstore.h" #include "remoteprojectstore.h" @@ -266,6 +267,72 @@ namespace { RemoteStore = CreateZenRemoteStore(Options, TempFilePath); } + if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds) + { + std::string_view BuildsServiceUrl = Builds["url"sv].AsString(); + if (BuildsServiceUrl.empty()) + { + return {nullptr, "Missing service url"}; + } + + std::string Url = cpr::util::urlDecode(std::string(BuildsServiceUrl)); + std::string_view Namespace = Builds["namespace"sv].AsString(); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + std::string_view Bucket = Builds["bucket"sv].AsString(); + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString(); + std::string AccessToken = std::string(Builds["access-token"sv].AsString()); + if (AccessToken.empty()) + { + std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString(); + if (!AccessTokenEnvVariable.empty()) + { + AccessToken = GetEnvVariable(AccessTokenEnvVariable); + } + } + std::string_view BuildIdParam = Builds["buildsid"sv].AsString(); + if (BuildIdParam.empty()) + { + return {nullptr, "Missing build id"}; + } + if (BuildIdParam.length() != Oid::StringLength) + { + return {nullptr, "Invalid build id"}; + } + Oid BuildId = Oid::FromHexString(BuildIdParam); + if (BuildId == Oid::Zero) + { + return {nullptr, "Invalid build id string"}; + } + + bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false); + bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false); + bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false); + + MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView(); + IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize()); + + BuildsRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + BuildId, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + ForceDisableBlocks, + ForceDisableTempBlocks, + AssumeHttp2, + MetaData}; + RemoteStore = CreateBuildsRemoteStore(Options, TempFilePath); + } + if (!RemoteStore) { return {nullptr, "Unknown remote store type"}; @@ -5132,7 +5199,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, /* BuildBlocks */ false, /* IgnoreMissingAttachments */ false, /* AllowChunking*/ false, - [](CompressedBuffer&&, const IoHash&) {}, + [](CompressedBuffer&&, RemoteProjectStore::Block&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, /* EmbedLooseFiles*/ false); @@ -8354,8 +8421,9 @@ TEST_CASE("project.store.block") return CompositeBuffer(SharedBuffer(Buffer)); })); } - CompressedBuffer Block = GenerateBlock(std::move(Chunks)); - CHECK(IterateBlock(Block.Decompress(), [](CompressedBuffer&&, const IoHash&) {})); + RemoteProjectStore::Block Block; + CompressedBuffer BlockBuffer = GenerateBlock(std::move(Chunks), Block); + CHECK(IterateBlock(BlockBuffer.Decompress(), [](CompressedBuffer&&, const IoHash&) {})); } TEST_CASE("project.store.iterateoplog") diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 49403d39c..216b1c4dd 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -143,13 +143,7 @@ namespace remotestore_impl { NiceBytes(Stats.m_PeakReceivedBytes)); } - struct Block - { - IoHash BlockHash; - std::vector<IoHash> ChunksInBlock; - }; - - size_t AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) + size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks) { size_t BlockIndex; { @@ -741,14 +735,14 @@ namespace remotestore_impl { }); }; - void CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<Block>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) + void CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<RemoteProjectStore::Block>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork([&Blocks, @@ -767,16 +761,17 @@ namespace remotestore_impl { try { ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); + Stopwatch Timer; + RemoteProjectStore::Block Block; + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex].BlockHash = BlockHash; + Blocks[BlockIndex] = Block; } uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), BlockHash); + AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); ZEN_INFO("Generated block with {} attachments in {} ({})", ChunkCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), @@ -800,12 +795,18 @@ namespace remotestore_impl { std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0; }; + struct CreatedBlock + { + IoBuffer Payload; + RemoteProjectStore::Block Block; + }; + void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, RemoteProjectStore& RemoteStore, const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks, - const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks, + const std::unordered_map<IoHash, CreatedBlock, IoHash::Hasher>& CreatedBlocks, const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments, const std::unordered_set<IoHash, IoHash::Hasher>& Needs, bool ForceAll, @@ -927,12 +928,12 @@ namespace remotestore_impl { } try { - bool IsBlock = false; - IoBuffer Payload; + IoBuffer Payload; + RemoteProjectStore::Block Block; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { - Payload = BlockIt->second; - IsBlock = true; + Payload = BlockIt->second.Payload; + Block = BlockIt->second.Block; } else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) { @@ -953,9 +954,10 @@ namespace remotestore_impl { RemoteResult.GetErrorReason()); return; } + const bool IsBlock = Block.BlockHash == RawHash; size_t PayloadSize = Payload.GetSize(); RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash); + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block)); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); @@ -1176,13 +1178,67 @@ IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuff return true; }; +std::vector<IoHash> +GetBlockHashesFromOplog(CbObjectView ContainerObject) +{ + using namespace std::literals; + std::vector<RemoteProjectStore::Block> Result; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + + std::vector<IoHash> BlockHashes; + BlockHashes.reserve(BlocksArray.Num()); + for (CbFieldView BlockField : BlocksArray) + { + CbObjectView BlockView = BlockField.AsObjectView(); + IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); + BlockHashes.push_back(BlockHash); + } + return BlockHashes; +} + +std::vector<RemoteProjectStore::Block> +GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes) +{ + using namespace std::literals; + std::vector<RemoteProjectStore::Block> Result; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet; + IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end()); + + Result.reserve(IncludeBlockHashes.size()); + for (CbFieldView BlockField : BlocksArray) + { + CbObjectView BlockView = BlockField.AsObjectView(); + IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); + if (IncludeSet.contains(BlockHash)) + { + std::vector<IoHash> ChunkHashes; + CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); + if (BlockHash == IoHash::Zero) + { + continue; + } + ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) + { + ChunkHashes.push_back(ChunkField.AsHash()); + } + Result.push_back({.BlockHash = BlockHash, .ChunkHashes = std::move(ChunkHashes)}); + } + } + return Result; +} + CompressedBuffer -GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks) +GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock) { + const size_t ChunkCount = FetchChunks.size(); + std::vector<SharedBuffer> ChunkSegments; ChunkSegments.resize(1); - ChunkSegments.reserve(1 + FetchChunks.size()); - size_t ChunkCount = FetchChunks.size(); + ChunkSegments.reserve(1 + ChunkCount); + OutBlock.ChunkHashes.reserve(ChunkCount); + OutBlock.ChunkLengths.reserve(ChunkCount); { IoBuffer TempBuffer(ChunkCount * 9); MutableMemoryView View = TempBuffer.GetMutableView(); @@ -1200,6 +1256,8 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks) ChunkSegments.push_back(Segment); } BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); + OutBlock.ChunkHashes.push_back(It.first); + OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize)); } ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); @@ -1207,7 +1265,8 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks) } CompressedBuffer CompressedBlock = CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); - + OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); + OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize()); return CompressedBlock; } @@ -1221,9 +1280,9 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::vector<remotestore_impl::Block>& KnownBlocks, + const std::vector<RemoteProjectStore::Block>& KnownBlocks, WorkerThreadPool& WorkerPool, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles, @@ -1245,9 +1304,9 @@ BuildContainer(CidStore& ChunkStore, std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - RwLock BlocksLock; - std::vector<remotestore_impl::Block> Blocks; - CompressedBuffer OpsBuffer; + RwLock BlocksLock; + std::vector<RemoteProjectStore::Block> Blocks; + CompressedBuffer OpsBuffer; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); @@ -1463,7 +1522,7 @@ BuildContainer(CidStore& ChunkStore, return {}; } - auto FindReuseBlocks = [](const std::vector<remotestore_impl::Block>& KnownBlocks, + auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks, const std::unordered_set<IoHash, IoHash::Hasher>& Attachments, JobContext* OptionalContext) -> std::vector<size_t> { std::vector<size_t> ReuseBlockIndexes; @@ -1476,14 +1535,14 @@ BuildContainer(CidStore& ChunkStore, for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { - const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); + const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size(); if (BlockAttachmentCount == 0) { continue; } size_t FoundAttachmentCount = 0; - for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) + for (const IoHash& KnownHash : KnownBlock.ChunkHashes) { if (Attachments.contains(KnownHash)) { @@ -1524,8 +1583,8 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) + const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkHashes) { if (UploadAttachments.erase(KnownHash) == 1) { @@ -1865,8 +1924,8 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) + const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkHashes) { if (ChunkedHashes.erase(KnownHash) == 1) { @@ -1978,7 +2037,7 @@ BuildContainer(CidStore& ChunkStore, uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs(); std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; auto NewBlock = [&]() { - size_t BlockIndex = AddBlock(BlocksLock, Blocks); + size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); size_t ChunkCount = ChunksInBlock.size(); if (BuildBlocks) { @@ -2000,9 +2059,9 @@ BuildContainer(CidStore& ChunkStore, { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); + Blocks[BlockIndex].ChunkHashes.insert(Blocks[BlockIndex].ChunkHashes.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); } uint64_t NowMS = Timer.GetElapsedTimeMs(); ZEN_INFO("Assembled block {} with {} chunks in {} ({})", @@ -2234,12 +2293,11 @@ BuildContainer(CidStore& ChunkStore, CbObjectWriter OplogContinerWriter; RwLock::SharedLockScope _(BlocksLock); OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); - OplogContinerWriter.BeginArray("blocks"sv); { - for (const remotestore_impl::Block& B : Blocks) + for (const RemoteProjectStore::Block& B : Blocks) { - ZEN_ASSERT(!B.ChunksInBlock.empty()); + ZEN_ASSERT(!B.ChunkHashes.empty()); if (BuildBlocks) { ZEN_ASSERT(B.BlockHash != IoHash::Zero); @@ -2249,7 +2307,7 @@ BuildContainer(CidStore& ChunkStore, OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContinerWriter.BeginArray("chunks"sv); { - for (const IoHash& RawHash : B.ChunksInBlock) + for (const IoHash& RawHash : B.ChunkHashes) { OplogContinerWriter.AddHash(RawHash); } @@ -2265,7 +2323,7 @@ BuildContainer(CidStore& ChunkStore, { OplogContinerWriter.BeginArray("chunks"sv); { - for (const IoHash& RawHash : B.ChunksInBlock) + for (const IoHash& RawHash : B.ChunkHashes) { OplogContinerWriter.AddBinaryAttachment(RawHash); } @@ -2331,7 +2389,7 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) @@ -2391,21 +2449,21 @@ SaveOplog(CidStore& ChunkStore, CreateDirectories(AttachmentTempPath); } - remotestore_impl::AsyncRemoteResult RemoteResult; - RwLock AttachmentsLock; - std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; - tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; + remotestore_impl::AsyncRemoteResult RemoteResult; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; + std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks; + tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; - auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - const IoHash& BlockHash) { + auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + RemoteProjectStore::Block&& Block) { std::filesystem::path BlockPath = AttachmentTempPath; - BlockPath.append(BlockHash.ToHexString()); + BlockPath.append(Block.BlockHash.ToHexString()); try { IoBuffer BlockBuffer = remotestore_impl::WriteToTempFile(std::move(CompressedBlock), BlockPath); RwLock::ExclusiveLockScope __(AttachmentsLock); - CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); + CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); } catch (const std::exception& Ex) @@ -2417,8 +2475,11 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { - RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); + auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, + RemoteProjectStore::Block&& Block) { + IoHash BlockHash = Block.BlockHash; + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block)); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); @@ -2448,7 +2509,7 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Found attachment {}", AttachmentHash); }; - std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock; + std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock; if (RemoteStoreInfo.UseTempBlockFiles) { OnBlock = MakeTempBlock; @@ -2458,95 +2519,52 @@ SaveOplog(CidStore& ChunkStore, OnBlock = UploadBlock; } - std::vector<remotestore_impl::Block> KnownBlocks; + std::vector<RemoteProjectStore::Block> KnownBlocks; uint64_t TransferWallTimeMS = 0; - if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty()) + RemoteProjectStore::CreateContainerResult ContainerResult = RemoteStore.CreateContainer(); + if (ContainerResult.ErrorCode) { + RemoteProjectStore::Result Result = {.ErrorCode = ContainerResult.ErrorCode, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Text = fmt::format("Failed to create container for oplog '{}' ({}): {}", + RemoteStoreInfo.ContainerName, + ContainerResult.ErrorCode, + ContainerResult.Reason)}; remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName)); - Stopwatch LoadBaseContainerTimer; - RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); - TransferWallTimeMS += LoadBaseContainerTimer.GetElapsedTimeMs(); - - if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent)) - { - if (BaseContainerResult.ErrorCode) - { - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments", - RemoteStoreInfo.BaseContainerName, - BaseContainerResult.ErrorCode, - BaseContainerResult.Reason)); - } - else - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Loaded oplog base container in {}", - NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0)))); - - CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView(); - - std::vector<IoHash> BlockHashes; - BlockHashes.reserve(BlocksArray.Num()); - for (CbFieldView BlockField : BlocksArray) - { - CbObjectView BlockView = BlockField.AsObjectView(); - IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); - BlockHashes.push_back(BlockHash); - } - - RemoteProjectStore::HasAttachmentsResult HasResult = RemoteStore.HasAttachments(BlockHashes); - if (HasResult.ErrorCode == 0) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}", - BlockHashes.size(), - BlockHashes.size() > 1 ? "s"sv : ""sv, - BlockHashes.size() - HasResult.Needs.size(), - NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0)))); - if (HasResult.Needs.size() < BlocksArray.Num()) - { - KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size()); - - const std::unordered_set<IoHash, IoHash::Hasher> MissingBlocks(HasResult.Needs); + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return Result; + } - for (CbFieldView BlockField : BlocksArray) - { - CbObjectView BlockView = BlockField.AsObjectView(); - IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); - if (!MissingBlocks.contains(BlockHash)) - { - std::vector<IoHash> ChunksInBlock; - CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); - if (BlockHash == IoHash::Zero) - { - continue; - } + if (RemoteStoreInfo.CreateBlocks) + { + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching known blocks from '{}'", RemoteStoreInfo.Description)); + Stopwatch GetKnownBlocksTimer; + RemoteProjectStore::GetKnownBlocksResult KnownBlocksResult = RemoteStore.GetKnownBlocks(); + TransferWallTimeMS += GetKnownBlocksTimer.GetElapsedTimeMs(); - ChunksInBlock.reserve(ChunksArray.Num()); - for (CbFieldView ChunkField : ChunksArray) - { - ChunksInBlock.push_back(ChunkField.AsHash()); - } - KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)}); - } - } - } - } - else - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none " - "does: '{}', error code : {}", - HasResult.Reason, - HasResult.ErrorCode)); - } - } + if (KnownBlocksResult.ErrorCode == static_cast<int>(HttpResponseCode::NoContent)) + { + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("No known blocks in '{}', uploading all attachments", RemoteStoreInfo.Description)); + } + else if (KnownBlocksResult.ErrorCode) + { + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Failed to get known blocks from '{}' ({}): {}, uploading all attachments", + RemoteStoreInfo.Description, + KnownBlocksResult.ErrorCode, + KnownBlocksResult.Reason)); + } + else + { + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Fetched {} known blocks from '{}' in {}", + KnownBlocksResult.Blocks.size(), + RemoteStoreInfo.Description, + NiceTimeSpanMs(static_cast<uint64_t>(KnownBlocksResult.ElapsedSeconds * 1000.0)))); + KnownBlocks = std::move(KnownBlocksResult.Blocks); } } @@ -2588,15 +2606,22 @@ SaveOplog(CidStore& ChunkStore, RemoteStoreInfo.ContainerName, ChunkCount, BlockCount)); - Stopwatch SaveContainerTimer; - RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); + Stopwatch SaveContainerTimer; + IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer(); + ContainerPayload.SetContentType(ZenContentType::kCbObject); + RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload)); TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs(); if (ContainerSaveResult.ErrorCode) { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); + RemoteProjectStore::Result Result = { + .ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Text = fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())}; remotestore_impl::ReportMessage( OptionalContext, fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return Result; } else { diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index f0655fb59..e05cb9923 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -16,6 +16,14 @@ struct ChunkedInfo; class RemoteProjectStore { public: + struct Block + { + IoHash BlockHash; + std::vector<IoHash> ChunkHashes; + std::vector<uint32_t> ChunkLengths; + uint32_t FirstChunkOffset = (uint32_t)-1; + }; + struct Result { int32_t ErrorCode{}; @@ -24,6 +32,10 @@ public: std::string Text; }; + struct CreateContainerResult : public Result + { + }; + struct SaveResult : public Result { std::unordered_set<IoHash, IoHash::Hasher> Needs; @@ -43,11 +55,6 @@ public: { }; - struct HasAttachmentsResult : public Result - { - std::unordered_set<IoHash, IoHash::Hasher> Needs; - }; - struct LoadAttachmentResult : public Result { IoBuffer Bytes; @@ -63,13 +70,17 @@ public: std::vector<std::pair<IoHash, CompressedBuffer>> Chunks; }; + struct GetKnownBlocksResult : public Result + { + std::vector<Block> Blocks; + }; + struct RemoteStoreInfo { bool CreateBlocks; bool UseTempBlockFiles; bool AllowChunking; std::string ContainerName; - std::string BaseContainerName; std::string Description; }; @@ -90,15 +101,15 @@ public: virtual RemoteStoreInfo GetInfo() const = 0; virtual Stats GetStats() const = 0; - virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0; - virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; + virtual CreateContainerResult CreateContainer() = 0; + virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) = 0; + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; virtual LoadContainerResult LoadContainer() = 0; - virtual LoadContainerResult LoadBaseContainer() = 0; + virtual GetKnownBlocksResult GetKnownBlocks() = 0; virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0; - virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) = 0; virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0; }; @@ -126,7 +137,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer( bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles); @@ -162,7 +173,9 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, bool CleanOplog, JobContext* OptionalContext); -CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks); +CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock); bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); +std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); +std::vector<RemoteProjectStore::Block> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); } // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index 6b05442b3..566d0d4b2 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -36,7 +36,6 @@ public: .UseTempBlockFiles = false, .AllowChunking = false, .ContainerName = fmt::format("{}/{}", m_Project, m_Oplog), - .BaseContainerName = "", .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; } @@ -51,6 +50,12 @@ public: .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } + virtual CreateContainerResult CreateContainer() override + { + // Nothing to do here + return {}; + } + virtual SaveResult SaveContainer(const IoBuffer& Payload) override { std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog); @@ -88,7 +93,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override { std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); @@ -231,16 +236,9 @@ public: return Result; } - virtual LoadContainerResult LoadBaseContainer() override - { - return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; - } - - virtual HasAttachmentsResult HasAttachments(const std::span<IoHash>) override + virtual GetKnownBlocksResult GetKnownBlocks() override { - // For zen as remote store we never store blocks so we should never get here - ZEN_ASSERT(false); - return HasAttachmentsResult{}; + return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index 3e477f053..2ae977f00 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -378,6 +378,157 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view return Result; } +CloudCacheResult +CloudCacheSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); + return detail::ConvertResponse(Response, "CloudCacheSession::PutBuild"sv); +} + +CloudCacheResult +CloudCacheSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "CloudCacheSession::GetBuild"sv); +} + +CloudCacheResult +CloudCacheSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); + return detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuild"sv); +} + +PutBuildPartResult +CloudCacheSession::PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + + IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), + Payload, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::PutBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +CloudCacheResult +CloudCacheSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildPart"sv); +} + +CloudCacheResult +CloudCacheSession::PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + Payload, + ContentType); + return detail::ConvertResponse(Response, "CloudCacheSession::PutBuildBlob"sv); +} + +CloudCacheResult +CloudCacheSession::GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Download( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + TempFolderPath); + return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildBlob"sv); +} + +CloudCacheResult +CloudCacheSession::PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Put( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + Payload); + return detail::ConvertResponse(Response, "CloudCacheSession::PutBlockMetadata"sv); +} + +FinalizeBuildPartResult +CloudCacheSession::FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), + HttpClient::Accept(ZenContentType::kCbObject)); + + FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +CloudCacheResult +CloudCacheSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "CloudCacheSession::FindBlocks"sv); +} + /** * An access token provider that holds a token that will never change. */ diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h index 00ba55bad..50e4ad68a 100644 --- a/src/zenserver/upstream/jupiter.h +++ b/src/zenserver/upstream/jupiter.h @@ -81,6 +81,17 @@ struct GetObjectReferencesResult : CloudCacheResult std::set<IoHash> References; }; +struct PutBuildPartResult : CloudCacheResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeBuildPartResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + /** * Context for performing Jupiter operations * @@ -95,6 +106,7 @@ public: ~CloudCacheSession(); CloudCacheResult Authenticate(); + CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); @@ -127,6 +139,42 @@ public: std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + CloudCacheResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); + CloudCacheResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + CloudCacheResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + PutBuildPartResult PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload); + CloudCacheResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + CloudCacheResult PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload); + CloudCacheResult GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath); + CloudCacheResult PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + const IoBuffer& Payload); + FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash); + CloudCacheResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + CloudCacheClient& Client() { return *m_CacheClient; }; private: diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 851b1d125..9f09713ee 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1457,7 +1457,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (SetMetaInfo) { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the - // metadata separately, check if it had time to set the metdata + // metadata separately, check if it had time to set the metadata RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { @@ -1578,7 +1578,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the - // metadata separately, check if it had time to set the metdata + // metadata separately, check if it had time to set the metadata RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { |