diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-09 16:49:51 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-09 07:49:51 -0800 |
| commit | 2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97 (patch) | |
| tree | d631da0746b78cad7140784de4e637bcfb4e1cac | |
| parent | Update README.md (diff) | |
| download | zen-2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97.tar.xz zen-2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97.zip | |
oplog upload/download (#214)
- Feature: Zen server endpoint `prj/{project}/oplog/{log}/chunks` to post multiple attachments in one request.
- Feature: Zen server endpoint `prj/{project}/oplog/{log}/save` to save an oplog container. Accepts `CbObject` containing a compressed oplog and attachment references organized in blocks.
- Feature: Zen server endpoint `prj/{project}/oplog/{log}/load` to request an oplog container. Responds with an `CbObject` containing a compressed oplog and attachment references organized in blocks.
- Feature: Zen server endpoint `{project}/oplog/{log}/rpc` to initiate an import to or export from an external location and other operations. Use either JSon or CbPackage as payload.
- CbObject/JSon RPC format for `import` and `export` methods:
- CbObject RPC format for `getchunks` method, returns CbPackage with the found chunks, if all chunks are found the number of attachments matches number of chunks requested.
- Feature: Zen server `{project}/oplog/{log}/{hash}` now accepts `HttpVerb::kPost` as well as `HttpVerb::kGet`.
- Feature: Zen command line tool `oplog-export` to export an oplog to an external target using the zenserver oplog export endpoint.
- Feature: Zen command line tool `oplog-import` to import an oplog from an external source using the zenserver oplog import endpoint.
22 files changed, 4025 insertions, 92 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 8088d56c3..7f14d73c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,115 @@ - `--gcpath` Absolute path to oplog lifetime marker file (optional) - Feature: Build scripts and tooling to build zen compliant with VFX reference platform CY2022/2021 matching UE linux builds - Feature: added `xmake sln` task which replaces `generate_projects.bat` +- Feature: Zen server endpoint `prj/{project}/oplog/{log}/chunks` to post multiple attachments in one request. +- Feature: Zen server endpoint `prj/{project}/oplog/{log}/save` to save an oplog container. Accepts `CbObject` containing a compressed oplog and attachment references organized in blocks. +- Feature: Zen server endpoint `prj/{project}/oplog/{log}/load` to request an oplog container. Responds with an `CbObject` containing a compressed oplog and attachment references organized in blocks. +- Feature: Zen server endpoint `{project}/oplog/{log}/rpc` to initiate an import to or export from an external location and other operations. Use either JSon or CbPackage as payload. + - CbObject/JSon RPC format for `import` and `export` methods: + ```json + { + "method" : "<method>", + "params" : { + "maxblocksize": "<maxblocksize>", + "maxchunkembedsize": "<maxchunkembedsize>", + "file" : { + "path" : "<file-system-folder-path>", + "name" : "<oplog-file-name>" + }, + "cloud" : { + "url" : "<serviceurl>", + "namespace" : "<namespace>", + "bucket" : "<bucket>", + "key" : "<iohash>", + "openid-provider" : "<provider-id>", + "access-token" : "<access-token>", + "disableblocks" : "<disableblocks>", + "disabletempblocks" : "<disabletempblocks>" + }, + "zen" : { + "url" : "<url>", + "project" : "<projectid>", + "oplog" : "<oplogid>" + } + } + } + ``` + - `"method"`supported methods are `"export"` and `"import"` to import/export an oplog + - `"params"` container for parameters + - `"maxblocksize"` - Optional. The maximum size of a block of attachments, default 134217728 (128 Mb) (export only) + - `"maxchunkembedsize"` - Optional. The maximum size of an attachment to be put in a block, larger attachments will be stored as usual attachments, default 1048576 (1Mb) (export only) + - `"force"` - Optional. Boolean flag to indicate weather attachments should be uploaded/downloaded disregarding prior existance + - External location types are "file" (File system), "cloud" (UE Cloud Storage service) or "zen" (Zen server instance), provide one of those as remote location. + - `"file"` - Optional. Indicates remote location is the local file system + - `"path"` - File system path folder to export to / import from + - `"name"` - File name of oplog output, written into <file-system-folder-path> + - `"cloud"` - Optional. Indicates remote location is UE Cloud Storage service + - `"url"` - Jupiter service endpoint url + - `"namespace"` - Name of namespace to store data to + - `"bucket"` - Name of bucket to store data to + - `"key"` - IoHash key to the stored oplog container + - `"openid-provider"` - Optional. Name of openid provider used to authenticate with, requires that the zen server instance has been provided with a oids refresh token for <provider-id> + - `"access-token"` - Optional. JWS access token to authenticate with + - `"disableblocks"` - Optional. Disable creation of attachments blocks - "true"/"false" (export only) + - `"disabletempblocks"` - Optional. Disable creation of attachments temp blocks forcing upload before oplog container - "true"/"false" (export only) + - `"zen"` - Optional. Indicates remote location is a Zen server instance + - `"url"` - Zen server instance url + - `"project"` - The remote project name (id) + - `"oplog" - The remote oplog name (id) + - CbObject RPC format for `getchunks` method, returns CbPackage with the found chunks, if all chunks are found the number of attachments matches number of chunks requested. + ```json + { + "method" : "getchunks", + "chunks" : [ + "<rawhash>", + ] + } + ``` + - CbPackage RPC format for `putchunks` method, attachments are stored in CidStore + ```json + { + "method" : "putchunks", + } + ``` +- Feature: Zen server `{project}/oplog/{log}/{hash}` now accepts `HttpVerb::kPost` as well as `HttpVerb::kGet`. +- Feature: Zen command line tool `oplog-export` to export an oplog to an external target using the zenserver oplog export endpoint. + - `--project` Project name (id) + - `--oplog` Project name (id) + - `--maxblocksize` The maximum size of a block of attachments (optional) + - `--maxchunkembedsize` The maximum size of an attachment to be put in a block, larger attachments will be stored as usual attachments (optional) + - `--force` Force upload/download of attachments even if they already exist. + - `--file` File system path folder to export to / import from + - `--name` File name of oplog output, written into `--file` path + - `--disableblocks` Disable block creation and save all attachments individually + - `--forcetempblocks` Force creation of temp attachment blocks + - `--cloud` Jupiter service endpoint to export to / import from + - `namespace` Name of namespace to store data to + - `bucket` Name of bucket to store data to + - `key` Key to the stored oplog container (If omitted a default key will be generated based on project/oplog/namespace/bucket) + - `openid-provider` Optional name of openid provider used to authenticate with, requires that the zen server instance has been provided with a oids refresh token for the provider name + - `access-token` Optional JWS access token to authenticate with + - `disableblocks` Disable block creation and save all attachments individually + - `disabletempblocks` Disable temp block creation and upload blocks without waiting for oplog container to be uploaded + - `--zen` Zen server instance url to export to / import from + - `--target-project` The remote project name (id) (optional, defaults to same as `project`) + - `--taret-oplog` The remote oplog name (id) (optional, defaults to same as `olplog`) + - `--clean` Delete and create a new oplog before starting export +- Feature: Zen command line tool `oplog-import` to import an oplog from an external source using the zenserver oplog import endpoint. + - `--project` Project name (id) + - `--oplog` Project name (id) + - `--force` Force upload/download of attachments even if they already exist. + - `--file` File system path folder to export to / import from + - `--name` File name of oplog output, written into `--file` path + - `--cloud` Jupiter service endpoint to export to / import from + - `namespace` Name of namespace to store data to + - `bucket` Name of bucket to store data to + - `key` Key to the stored oplog container (If omitted a default key will be generated based on project/oplog/namespace/bucket) + - `openid-provider` Optional name of openid provider used to authenticate with, requires that the zen server instance has been provided with a oids refresh token for the provider name + - `access-token` Optional JWS access token to authenticate with + - `--zen` Zen server instance url to export to / import from + - `--source-project` The remote project name (id) (optional, defaults to same as `project`) + - `--source-oplog` The remote oplog name (id) (optional, defaults to same as `olplog`) + - `--clean` Delete and create a new oplog before starting import - Improvement: Faster oplog replay - reduces time to open an existing oplog - Improvement: Clearer error messages and logging when requests to project store fails - Changed: Removed remnants of old mesh experiment diff --git a/zen/cmds/projectstore.cpp b/zen/cmds/projectstore.cpp index c53f0bc35..8ecdecaaa 100644 --- a/zen/cmds/projectstore.cpp +++ b/zen/cmds/projectstore.cpp @@ -2,28 +2,13 @@ #include "projectstore.h" -#include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryvalue.h> -#include <zencore/compress.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/stream.h> -#include <zencore/uid.h> -#include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> -#include <zenhttp/httpshared.h> -#include <zenutil/basicfile.h> -#include <zenutil/zenserverprocess.h> - -#include <memory> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> -#include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END /////////////////////////////////////// @@ -32,9 +17,9 @@ DropProjectCommand::DropProjectCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); - m_Options.add_option("", "p", "project", "Namnspace name", cxxopts::value(m_ProjectName), "<projectname>"); - m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogname>"); - m_Options.parse_positional({"{project}", "{oplog}"}); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.parse_positional({"project", "oplog"}); } DropProjectCommand::~DropProjectCommand() = default; @@ -92,9 +77,9 @@ ProjectInfoCommand::ProjectInfoCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); - m_Options.add_option("", "p", "project", "Namnspace name", cxxopts::value(m_ProjectName), "<projectname>"); - m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogname>"); - m_Options.parse_positional({"{project}", "{oplog}"}); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.parse_positional({"project", "oplog"}); } ProjectInfoCommand::~ProjectInfoCommand() = default; @@ -279,3 +264,485 @@ CreateOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg return GetReturnCode(Response); } + +/////////////////////////////////////// + +ExportOplogCommand::ExportOplogCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.add_option("", "", "maxblocksize", "Max size for bundled attachments", cxxopts::value(m_MaxBlockSize), "<blocksize>"); + m_Options.add_option("", + "", + "maxchunkembedsize", + "Max size for attachment to be bundled", + cxxopts::value(m_MaxChunkEmbedSize), + "<chunksize>"); + m_Options.add_option("", "f", "force", "Force export of all attachments", cxxopts::value(m_Force), "<force>"); + m_Options.add_option("", + "", + "disableblocks", + "Disable block creation and save all attachments individually (applies to file and cloud target)", + cxxopts::value(m_DisableBlocks), + "<disable>"); + + m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); + m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); + m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>"); + m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>"); + m_Options + .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>"); + m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>"); + m_Options.add_option("cloud", + "", + "disabletempblocks", + "Disable temp block creation and upload blocks without waiting for oplog container to be uploaded", + cxxopts::value(m_CloudDisableTempBlocks), + "<disable>"); + + m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>"); + m_Options.add_option("zen", "", "target-project", "Zen target project name", cxxopts::value(m_ZenProjectName), "<targetprojectid>"); + m_Options.add_option("zen", "", "target-oplog", "Zen target oplog name", cxxopts::value(m_ZenOplogName), "<targetoplogid>"); + m_Options.add_option("zen", "", "clean", "Delete existing target Zen oplog", cxxopts::value(m_ZenClean), "<clean>"); + + m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>"); + m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>"); + m_Options.add_option("file", + "", + "forcetempblocks", + "Force creation of temp attachment blocks", + cxxopts::value(m_FileForceEnableTempBlocks), + "<forcetempblocks>"); + + m_Options.parse_positional({"project", "oplog"}); +} + +ExportOplogCommand::~ExportOplogCommand() = default; + +int +ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_OplogName.empty()) + { + ZEN_ERROR("Oplog name must be given"); + return 1; + } + + size_t TargetCount = 0; + TargetCount += m_CloudUrl.empty() ? 0 : 1; + TargetCount += m_ZenUrl.empty() ? 0 : 1; + TargetCount += m_FileDirectoryPath.empty() ? 0 : 1; + if (TargetCount != 1) + { + ZEN_ERROR("Provide one target only"); + ZEN_CONSOLE("{}", m_Options.help({""}).c_str()); + return 1; + } + + cpr::Session Session; + + if (!m_CloudUrl.empty()) + { + if (m_CloudNamespace.empty() || m_CloudBucket.empty()) + { + ZEN_ERROR("Options for cloud target are missing"); + ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str()); + return 1; + } + if (m_CloudKey.empty()) + { + std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket); + zen::IoHash Key = zen::IoHash::HashBuffer(KeyString.data(), KeyString.size()); + m_CloudKey = Key.ToHexString(); + ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey); + } + } + + if (!m_ZenUrl.empty()) + { + if (m_ZenProjectName.empty()) + { + m_ZenProjectName = m_ProjectName; + ZEN_WARN("Using default zen target project id '{}'", m_ZenProjectName); + } + if (m_ZenOplogName.empty()) + { + m_ZenOplogName = m_OplogName; + ZEN_WARN("Using default zen target oplog id '{}'", m_ZenOplogName); + } + + std::string TargetUrlBase = fmt::format("{}/prj", m_ZenUrl); + if (TargetUrlBase.find("://") == std::string::npos) + { + // Assume https URL + TargetUrlBase = fmt::format("http://{}", TargetUrlBase); + } + + Session.SetUrl({fmt::format("{}/{}/oplog/{}", TargetUrlBase, m_ZenProjectName, m_ZenOplogName)}); + cpr::Response Response = Session.Get(); + if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound)) + { + ZEN_WARN("Automatically creating oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName) + Response = Session.Post(); + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + } + else if (!zen::IsHttpSuccessCode(Response.status_code)) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + else if (m_ZenClean) + { + ZEN_WARN("Cleaning oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName) + Response = Session.Delete(); + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + Response = Session.Post(); + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + } + } + + if (!m_FileDirectoryPath.empty()) + { + if (m_FileName.empty()) + { + m_FileName = m_OplogName; + ZEN_WARN("Using default file name '{}'", m_FileName); + } + } + + const std::string SourceUrlBase = fmt::format("{}/prj", m_HostName); + std::string TargetDescription; + Session.SetUrl({fmt::format("{}/{}/oplog/{}/rpc", SourceUrlBase, m_ProjectName, m_OplogName)}); + Session.SetHeader({{"Content-Type", std::string(zen::MapContentTypeToString(zen::HttpContentType::kCbObject))}}); + zen::CbObjectWriter Writer; + Writer.AddString("method"sv, "export"sv); + Writer.BeginObject("params"sv); + { + if (m_MaxBlockSize != 0) + { + Writer.AddInteger("maxblocksize"sv, m_MaxBlockSize); + } + if (m_MaxChunkEmbedSize != 0) + { + Writer.AddInteger("maxchunkembedsize"sv, m_MaxChunkEmbedSize); + } + if (m_Force) + { + Writer.AddBool("force"sv, true); + } + if (!m_FileDirectoryPath.empty()) + { + Writer.BeginObject("file"sv); + { + Writer.AddString("file"sv, m_FileDirectoryPath); + Writer.AddString("name"sv, m_FileName); + if (m_DisableBlocks) + { + Writer.AddBool("disableblocks"sv, true); + } + if (m_FileForceEnableTempBlocks) + { + Writer.AddBool("enabletempblocks"sv, true); + } + } + Writer.EndObject(); // "file" + TargetDescription = fmt::format("[file] '{}/{}'", m_FileDirectoryPath, m_FileName); + } + if (!m_CloudUrl.empty()) + { + Writer.BeginObject("cloud"sv); + { + Writer.AddString("url"sv, m_CloudUrl); + Writer.AddString("namespace"sv, m_CloudNamespace); + Writer.AddString("bucket"sv, m_CloudBucket); + Writer.AddString("key"sv, m_CloudKey); + if (!m_CloudOpenIdProvider.empty()) + { + Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider); + } + if (!m_CloudAccessToken.empty()) + { + Writer.AddString("access-token"sv, m_CloudAccessToken); + } + if (m_DisableBlocks) + { + Writer.AddBool("disableblocks"sv, true); + } + if (m_CloudDisableTempBlocks) + { + Writer.AddBool("disabletempblocks"sv, true); + } + } + Writer.EndObject(); // "cloud" + TargetDescription = fmt::format("[cloud] '{}/{}/{}/{}'", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey); + } + if (!m_ZenUrl.empty()) + { + Writer.BeginObject("zen"sv); + { + Writer.AddString("url"sv, m_ZenUrl); + Writer.AddString("project"sv, m_ZenProjectName); + Writer.AddString("oplog"sv, m_ZenOplogName); + } + Writer.EndObject(); // "zen" + + TargetDescription = fmt::format("[zen] '{}/{}/{}'", m_ZenUrl, m_ZenProjectName, m_ZenOplogName); + } + } + Writer.EndObject(); // "params" + + zen::BinaryWriter MemOut; + Writer.Save(MemOut); + Session.SetBody(cpr::Body{(const char*)MemOut.GetData(), MemOut.GetSize()}); + + ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription); + cpr::Response Response = Session.Post(); + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); +} + +//////////////////////////// + +ImportOplogCommand::ImportOplogCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.add_option("", "", "maxblocksize", "Max size for bundled attachments", cxxopts::value(m_MaxBlockSize), "<blocksize>"); + m_Options.add_option("", + "", + "maxchunkembedsize", + "Max size for attachment to be bundled", + cxxopts::value(m_MaxChunkEmbedSize), + "<chunksize>"); + m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>"); + + m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); + m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); + m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>"); + m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>"); + m_Options + .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>"); + m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>"); + + m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>"); + m_Options.add_option("zen", "", "source-project", "Zen source project name", cxxopts::value(m_ZenProjectName), "<sourceprojectid>"); + m_Options.add_option("zen", "", "source-oplog", "Zen source oplog name", cxxopts::value(m_ZenOplogName), "<sourceoplogid>"); + m_Options.add_option("zen", "", "clean", "Delete existing target Zen oplog", cxxopts::value(m_ZenClean), "<clean>"); + + m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>"); + m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>"); + + m_Options.parse_positional({"project", "oplog"}); +} + +ImportOplogCommand::~ImportOplogCommand() = default; + +int +ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_OplogName.empty()) + { + ZEN_ERROR("Oplog name must be given"); + return 1; + } + + size_t TargetCount = 0; + TargetCount += m_CloudUrl.empty() ? 0 : 1; + TargetCount += m_ZenUrl.empty() ? 0 : 1; + TargetCount += m_FileDirectoryPath.empty() ? 0 : 1; + if (TargetCount != 1) + { + ZEN_ERROR("Provide one source only"); + ZEN_CONSOLE("{}", m_Options.help({""}).c_str()); + return 1; + } + + cpr::Session Session; + + if (!m_CloudUrl.empty()) + { + if (m_CloudNamespace.empty() || m_CloudBucket.empty()) + { + ZEN_ERROR("Options for cloud source are missing"); + ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str()); + return 1; + } + if (m_CloudKey.empty()) + { + std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket); + zen::IoHash Key = zen::IoHash::HashBuffer(KeyString.data(), KeyString.size()); + m_CloudKey = Key.ToHexString(); + ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey); + } + } + + if (!m_ZenUrl.empty()) + { + if (m_ZenProjectName.empty()) + { + m_ZenProjectName = m_ProjectName; + ZEN_WARN("Using default zen target project id '{}'", m_ZenProjectName); + } + if (m_ZenOplogName.empty()) + { + m_ZenOplogName = m_OplogName; + ZEN_WARN("Using default zen target oplog id '{}'", m_ZenOplogName); + } + } + + if (!m_FileDirectoryPath.empty()) + { + if (m_FileName.empty()) + { + m_FileName = m_OplogName; + ZEN_WARN("Using auto generated file name '{}'", m_FileName); + } + } + + const std::string TargetUrlBase = fmt::format("{}/prj", m_HostName); + Session.SetUrl({fmt::format("{}/{}/oplog/{}", TargetUrlBase, m_ProjectName, m_OplogName)}); + cpr::Response Response = Session.Get(); + if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound)) + { + ZEN_WARN("Automatically creating oplog '{}/{}'", m_ProjectName, m_OplogName) + Response = Session.Post(); + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + } + else if (!zen::IsHttpSuccessCode(Response.status_code)) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + else if (m_ZenClean) + { + ZEN_WARN("Cleaning oplog '{}/{}'", m_ProjectName, m_OplogName) + Response = Session.Delete(); + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + Response = Session.Post(); + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + } + + std::string SourceDescription; + Session.SetUrl(fmt::format("{}/{}/oplog/{}/rpc", TargetUrlBase, m_ProjectName, m_OplogName)); + Session.SetHeader({{"Content-Type", std::string(zen::MapContentTypeToString(zen::HttpContentType::kCbObject))}}); + + zen::CbObjectWriter Writer; + Writer.AddString("method"sv, "import"sv); + Writer.BeginObject("params"sv); + { + if (m_Force) + { + Writer.AddBool("force"sv, true); + } + if (!m_FileDirectoryPath.empty()) + { + Writer.BeginObject("file"sv); + { + Writer.AddString("file"sv, m_FileDirectoryPath); + Writer.AddString("name"sv, m_FileName); + } + Writer.EndObject(); // "file" + SourceDescription = fmt::format("[file] '{}/{}'", m_FileDirectoryPath, m_FileName); + } + if (!m_CloudUrl.empty()) + { + Writer.BeginObject("cloud"sv); + { + Writer.AddString("url"sv, m_CloudUrl); + Writer.AddString("namespace"sv, m_CloudNamespace); + Writer.AddString("bucket"sv, m_CloudBucket); + Writer.AddString("key"sv, m_CloudKey); + if (!m_CloudOpenIdProvider.empty()) + { + Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider); + } + if (!m_CloudAccessToken.empty()) + { + Writer.AddString("access-token"sv, m_CloudAccessToken); + } + } + Writer.EndObject(); // "cloud" + SourceDescription = fmt::format("[cloud] '{}/{}/{}/{}'", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey); + } + if (!m_ZenUrl.empty()) + { + Writer.BeginObject("zen"sv); + { + Writer.AddString("url"sv, m_ZenUrl); + Writer.AddString("project"sv, m_ZenProjectName); + Writer.AddString("oplog"sv, m_ZenOplogName); + } + Writer.EndObject(); // "zen" + SourceDescription = fmt::format("[zen] '{}'", m_ZenUrl); + } + } + Writer.EndObject(); // "params" + + zen::BinaryWriter MemOut; + Writer.Save(MemOut); + Session.SetBody(cpr::Body{(const char*)MemOut.GetData(), MemOut.GetSize()}); + + ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName); + Response = Session.Post(); + + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); +} diff --git a/zen/cmds/projectstore.h b/zen/cmds/projectstore.h index 73cba8f66..af4da548f 100644 --- a/zen/cmds/projectstore.h +++ b/zen/cmds/projectstore.h @@ -70,3 +70,76 @@ private: std::string m_OplogId; std::string m_GcPath; }; + +class ExportOplogCommand : public ZenCmdBase +{ +public: + ExportOplogCommand(); + ~ExportOplogCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"oplog-export", + "Export project store oplog to cloud (--cloud), file system (--file) or other Zen instance (--zen)"}; + std::string m_HostName; + std::string m_ProjectName; + std::string m_OplogName; + uint64_t m_MaxBlockSize = 0; + uint64_t m_MaxChunkEmbedSize = 0; + bool m_Force = false; + bool m_DisableBlocks = false; + + std::string m_CloudUrl; + std::string m_CloudNamespace; + std::string m_CloudBucket; + std::string m_CloudKey; + std::string m_CloudOpenIdProvider; + std::string m_CloudAccessToken; + bool m_CloudDisableTempBlocks = false; + + std::string m_ZenUrl; + std::string m_ZenProjectName; + std::string m_ZenOplogName; + bool m_ZenClean; + + std::string m_FileDirectoryPath; + std::string m_FileName; + bool m_FileForceEnableTempBlocks = false; +}; + +class ImportOplogCommand : public ZenCmdBase +{ +public: + ImportOplogCommand(); + ~ImportOplogCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"oplog-import", + "Import project store oplog from cloud (--cloud), file system (--file) or other Zen instance (--zen)"}; + std::string m_HostName; + std::string m_ProjectName; + std::string m_OplogName; + size_t m_MaxBlockSize = 0; + size_t m_MaxChunkEmbedSize = 0; + bool m_Force = false; + + std::string m_CloudUrl; + std::string m_CloudNamespace; + std::string m_CloudBucket; + std::string m_CloudKey; + std::string m_CloudOpenIdProvider; + std::string m_CloudAccessToken; + + std::string m_ZenUrl; + std::string m_ZenProjectName; + std::string m_ZenOplogName; + bool m_ZenClean; + + std::string m_FileDirectoryPath; + std::string m_FileName; +}; diff --git a/zen/zen.cpp b/zen/zen.cpp index 2b6a529fe..9d85680d1 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -22,7 +22,6 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/string.h> -#include <zencore/zencore.h> #include <zenhttp/httpcommon.h> @@ -69,7 +68,7 @@ ZenCmdBase::ParseOptions(int argc, char** argv) cxxopts::ParseResult Result = CmdOptions.parse(argc, argv); if (Result.count("help")) { - printf("%s\n", CmdOptions.help({}).c_str()); + printf("%s\n", CmdOptions.help().c_str()); return false; } if (!Result.unmatched().empty()) @@ -215,6 +214,8 @@ main(int argc, char** argv) PsCommand PsCmd; UpCommand UpCmd; DownCommand DownCmd; + ExportOplogCommand ExportOplogCmd; + ImportOplogCommand ImportOplogCmd; VersionCommand VersionCmd; CacheInfoCommand CacheInfoCmd; DropProjectCommand ProjectDropCmd; @@ -253,6 +254,8 @@ main(int argc, char** argv) {"project-info", &ProjectInfoCmd, "Info on project or project oplog"}, {"project-create", &CreateProjectCmd, "Create a project"}, {"oplog-create", &CreateOplogCmd, "Create a project oplog"}, + {"oplog-export", &ExportOplogCmd, "Export project store oplog"}, + {"oplog-import", &ImportOplogCmd, "Import project store oplog"}, {"gc", &GcCmd, "Garbage collect zen storage"}, {"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"}, #if ZEN_WITH_TESTS diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h index 2aad22061..a9c96d422 100644 --- a/zencore/include/zencore/thread.h +++ b/zencore/include/zencore/thread.h @@ -163,6 +163,15 @@ public: std::ptrdiff_t Remaining() const { return Counter.load(); } + // If you want to add dynamic count, make sure to set the initial counter to 1 + // and then do a CountDown() just before wait to not trigger the event causing + // false positive completion results. + void AddCount(std::ptrdiff_t Count) + { + std::atomic_ptrdiff_t Old = Counter.fetch_add(Count); + ZEN_ASSERT_SLOW(Old > 0); + } + bool Wait(int TimeoutMs = -1) { std::ptrdiff_t Old = Counter.load(); diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 5c7e150ce..d8b135f74 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -15,8 +15,10 @@ #include <zencore/refcount.h> #include <zencore/stream.h> #include <zencore/string.h> +#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/timer.h> +#include <zencore/xxhash.h> #include <zenhttp/httpclient.h> #include <zenhttp/httpshared.h> #include <zenhttp/websocket.h> @@ -49,6 +51,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <random> #include <span> #include <thread> +#include <typeindex> #include <unordered_map> #if ZEN_PLATFORM_WINDOWS @@ -2323,12 +2326,12 @@ public: ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {} ~ZenServerTestHelper() {} - void SpawnServers() + void SpawnServers(std::string_view AdditionalServerArgs = std::string_view()) { - SpawnServers([](ZenServerInstance&) {}); + SpawnServers([](ZenServerInstance&) {}, AdditionalServerArgs); } - void SpawnServers(auto&& Callback) + void SpawnServers(auto&& Callback, std::string_view AdditionalServerArgs) { ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount); @@ -2343,7 +2346,7 @@ public: Callback(*Instance); - Instance->SpawnServer(13337 + i); + Instance->SpawnServer(13337 + i, AdditionalServerArgs); } for (int i = 0; i < m_ServerCount; ++i) @@ -2530,6 +2533,541 @@ TEST_CASE("websocket.basic") IoDispatcher.Stop(); } +std::string +OidAsString(const Oid& Id) +{ + StringBuilder<25> OidStringBuilder; + Id.ToString(OidStringBuilder); + return OidStringBuilder.ToString(); +} + +CbPackage +CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments) +{ + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + Package.AddAttachment(Attach); + ZEN_DEBUG("Added attachment {}", Attach.GetHash()); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; +}; + +std::vector<std::pair<Oid, CompressedBuffer> > +CreateAttachments(const std::span<const size_t>& Sizes) +{ + std::vector<std::pair<Oid, CompressedBuffer> > Result; + Result.reserve(Sizes.size()); + for (size_t Size : Sizes) + { + std::vector<uint8_t> Data; + Data.resize(Size); + uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); + for (size_t Idx = 0; Idx < Size / 2; ++Idx) + { + DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu); + } + if (Size & 1) + { + Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff); + } + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); + Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); + } + return Result; +} + +cpr::Body +AsBody(const IoBuffer& Payload) +{ + return cpr::Body{(const char*)Payload.GetData(), Payload.Size()}; +}; + +enum CbWriterMeta +{ + BeginObject, + EndObject, + BeginArray, + EndArray +}; + +inline CbWriter& +operator<<(CbWriter& Writer, CbWriterMeta Meta) +{ + switch (Meta) + { + case BeginObject: + Writer.BeginObject(); + break; + case EndObject: + Writer.EndObject(); + break; + case BeginArray: + Writer.BeginArray(); + break; + case EndArray: + Writer.EndArray(); + break; + default: + ZEN_ASSERT(false); + } + return Writer; +} + +TEST_CASE("project.remote") +{ + using namespace std::literals; + + ZenServerTestHelper Servers("remote", 3); + Servers.SpawnServers("--debug"); + + std::vector<Oid> OpIds; + OpIds.reserve(24); + for (size_t I = 0; I < 24; ++I) + { + OpIds.emplace_back(Oid::NewOid()); + } + + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments; + { + std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, + 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, + 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045, + 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); + auto It = AttachmentSizes.begin(); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[4]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[5]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[6]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[7]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[8]] = CreateAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[9]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[10]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[11]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[12]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++}); + Attachments[OpIds[13]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[14]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[15]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[16]] = CreateAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[17]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[18]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++}); + Attachments[OpIds[19]] = CreateAttachments(std::initializer_list<size_t>{}); + Attachments[OpIds[20]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[21]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + Attachments[OpIds[22]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++}); + Attachments[OpIds[23]] = CreateAttachments(std::initializer_list<size_t>{*It++}); + ZEN_ASSERT(It == AttachmentSizes.end()); + } + + auto AddOp = [](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) { + XXH3_128Stream KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash = KeyHasher.GetHash(); + Oid Id; + memcpy(Id.OidBits, &KeyHash, sizeof Id.OidBits); + IoBuffer Buffer = Op.GetBuffer().AsIoBuffer(); + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF); + Ops.insert({Id, OpCoreHash}); + }; + + auto MakeProject = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName) { + CbObjectWriter Project; + Project.AddString("id"sv, ProjectName); + Project.AddString("root"sv, ""sv); + Project.AddString("engine"sv, ""sv); + Project.AddString("project"sv, ""sv); + Project.AddString("projectfile"sv, ""sv); + IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer(); + std::string ProjectRequest = fmt::format("{}/prj/{}", UrlBase, ProjectName); + Session.SetUrl({ProjectRequest}); + Session.SetBody(cpr::Body{(const char*)ProjectPayload.GetData(), ProjectPayload.GetSize()}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + }; + + auto MakeOplog = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) { + std::string CreateOplogRequest = fmt::format("{}/prj/{}/oplog/{}", UrlBase, ProjectName, OplogName); + Session.SetUrl({CreateOplogRequest}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + }; + + auto MakeOp = [](cpr::Session& Session, + std::string_view UrlBase, + std::string_view ProjectName, + std::string_view OplogName, + const CbPackage& OpPackage) { + std::string CreateOpRequest = fmt::format("{}/prj/{}/oplog/{}/new", UrlBase, ProjectName, OplogName); + Session.SetUrl({CreateOpRequest}); + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); + Session.SetBody(cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + }; + + cpr::Session Session; + MakeProject(Session, Servers.GetInstance(0).GetBaseUri(), "proj0"); + MakeOplog(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + + std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps; + for (const Oid& OpId : OpIds) + { + CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]); + CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size()); + AddOp(OpPackage.GetObject(), SourceOps); + MakeOp(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage); + } + + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(Attachments.size()); + for (const auto& AttachmentOplog : Attachments) + { + for (const auto& Attachment : AttachmentOplog.second) + { + AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash()); + } + } + + auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer { + CbObjectWriter Writer; + Write(Writer); + IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer(); + Result.MakeOwned(); + return Result; + }; + + auto ValidateAttachments = [&MakeCbObjectPayload, &AttachmentHashes, &Servers, &Session](int ServerIndex, + std::string_view Project, + std::string_view Oplog) { + std::string GetChunksRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog); + Session.SetUrl({GetChunksRequest}); + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) { + Writer << "method"sv + << "getchunks"sv; + Writer << "chunks"sv << BeginArray; + for (const IoHash& Chunk : AttachmentHashes) + { + Writer << Chunk; + } + Writer << EndArray; // chunks + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); + CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size()); + }; + + auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) { + std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps; + std::vector<CbObject> ResultingOplog; + + std::string GetOpsRequest = + fmt::format("{}/prj/{}/oplog/{}/entries", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog); + Session.SetUrl({GetOpsRequest}); + cpr::Response Response = Session.Get(); + CHECK(IsHttpSuccessCode(Response.status_code)); + + IoBuffer Payload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); + CbObject OplogResonse = LoadCompactBinaryObject(Payload); + CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView(); + + for (CbFieldView OpEntry : EntriesArray) + { + CbObjectView Core = OpEntry.AsObjectView(); + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + AddOp(Op, TargetOps); + } + CHECK(SourceOps == TargetOps); + }; + + SUBCASE("File") + { + ScopedTemporaryDirectory TempDir; + { + std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + Session.SetUrl({SaveOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << path; + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + { + MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + std::string LoadOplogRequest = + fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + Session.SetUrl({LoadOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << path; + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("File disable blocks") + { + ScopedTemporaryDirectory TempDir; + { + std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + Session.SetUrl({SaveOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + Writer << "disableblocks"sv << true; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + { + MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + std::string LoadOplogRequest = + fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + Session.SetUrl({LoadOplogRequest}); + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("File force temp blocks") + { + ScopedTemporaryDirectory TempDir; + { + std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0"); + Session.SetUrl({SaveOplogRequest}); + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + Writer << "enabletempblocks"sv << true; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + { + MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); + MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + std::string LoadOplogRequest = + fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + Session.SetUrl({LoadOplogRequest}); + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "file"sv << BeginObject; + { + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + } + + SUBCASE("Zen") + { + ScopedTemporaryDirectory TempDir; + { + std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri(); + std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri(); + MakeProject(Session, ExportTargetUri, "proj0_copy"); + MakeOplog(Session, ExportTargetUri, "proj0_copy", "oplog0_copy"); + + std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ExportSourceUri, "proj0", "oplog0"); + Session.SetUrl({SaveOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "export"sv; + Writer << "params" << BeginObject; + { + Writer << "maxblocksize"sv << 3072u; + Writer << "maxchunkembedsize"sv << 1296u; + Writer << "force"sv << false; + Writer << "zen"sv << BeginObject; + { + Writer << "url"sv << ExportTargetUri.substr(7); + Writer << "project" + << "proj0_copy"; + Writer << "oplog" + << "oplog0_copy"; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(1, "proj0_copy", "oplog0_copy"); + ValidateOplog(1, "proj0_copy", "oplog0_copy"); + + { + std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri(); + std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri(); + MakeProject(Session, ImportTargetUri, "proj1"); + MakeOplog(Session, ImportTargetUri, "proj1", "oplog1"); + std::string LoadOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ImportTargetUri, "proj1", "oplog1"); + Session.SetUrl({LoadOplogRequest}); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer << "method"sv + << "import"sv; + Writer << "params" << BeginObject; + { + Writer << "force"sv << false; + Writer << "zen"sv << BeginObject; + { + Writer << "url"sv << ImportSourceUri.substr(7); + Writer << "project" + << "proj0_copy"; + Writer << "oplog" + << "oplog0_copy"; + } + Writer << EndObject; // "file" + } + Writer << EndObject; // "params" + }); + Session.SetBody(AsBody(Payload)); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + cpr::Response Response = Session.Post(); + CHECK(IsHttpSuccessCode(Response.status_code)); + } + ValidateAttachments(2, "proj1", "oplog1"); + ValidateOplog(2, "proj1", "oplog1"); + } +} + # if 0 TEST_CASE("lifetime.owner") { diff --git a/zenserver/auth/oidc.cpp b/zenserver/auth/oidc.cpp index 17b5bac08..d2265c22f 100644 --- a/zenserver/auth/oidc.cpp +++ b/zenserver/auth/oidc.cpp @@ -104,7 +104,7 @@ OidcClient::RefreshToken(std::string_view RefreshToken) if (Response.status_code != 200) { - return {.Reason = std::move(Response.reason)}; + return {.Reason = fmt::format("{} ({})", Response.reason, Response.text)}; } std::string JsonError; diff --git a/zenserver/projectstore/fileremoteprojectstore.cpp b/zenserver/projectstore/fileremoteprojectstore.cpp new file mode 100644 index 000000000..d7a34a6c2 --- /dev/null +++ b/zenserver/projectstore/fileremoteprojectstore.cpp @@ -0,0 +1,235 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "fileremoteprojectstore.h" + +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/timer.h> + +namespace zen { + +using namespace std::literals; + +class LocalExportProjectStore : public RemoteProjectStore +{ +public: + LocalExportProjectStore(std::string_view Name, + const std::filesystem::path& FolderPath, + bool ForceDisableBlocks, + bool ForceEnableTempBlocks) + : m_Name(Name) + , m_OutputPath(FolderPath) + { + if (ForceDisableBlocks) + { + m_EnableBlocks = false; + } + if (ForceEnableTempBlocks) + { + m_UseTempBlocks = true; + } + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .Description = fmt::format("[file] {}"sv, m_OutputPath)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + Stopwatch Timer; + SaveResult Result; + + { + CbObject ContainerObject = LoadCompactBinaryObject(Payload); + + ContainerObject.IterateAttachments([&](CbFieldView FieldView) { + IoHash AttachmentHash = FieldView.AsBinaryAttachment(); + std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); + if (!std::filesystem::exists(AttachmentPath)) + { + Result.Needs.insert(AttachmentHash); + } + }); + } + + std::filesystem::path ContainerPath = m_OutputPath; + ContainerPath.append(m_Name); + + CreateDirectories(m_OutputPath); + BasicFile ContainerFile; + ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate); + std::error_code Ec; + ContainerFile.WriteAll(Payload, Ec); + if (Ec) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = Ec.message(); + } + Result.RawHash = IoHash::HashBuffer(Payload); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + Stopwatch Timer; + SaveAttachmentResult Result; + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!std::filesystem::exists(ChunkPath)) + { + try + { + CreateDirectories(ChunkPath.parent_path()); + + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate); + size_t Offset = 0; + for (const SharedBuffer& Segment : Payload.GetSegments()) + { + ChunkFile.Write(Segment.GetView(), Offset); + Offset += Segment.GetSize(); + } + } + catch (std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = Ex.what(); + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + Stopwatch Timer; + + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); + if (ChunkResult.ErrorCode) + { + ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return SaveAttachmentsResult{ChunkResult}; + } + } + SaveAttachmentsResult Result; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual Result FinalizeContainer(const IoHash&) override { return {}; } + + virtual LoadContainerResult LoadContainer() override + { + Stopwatch Timer; + LoadContainerResult Result; + std::filesystem::path ContainerPath = m_OutputPath; + ContainerPath.append(m_Name); + if (!std::filesystem::is_regular_file(ContainerPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = fmt::format("The file {} does not exist"sv, ContainerPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + IoBuffer ContainerPayload; + { + BasicFile ContainerFile; + ContainerFile.Open(ContainerPath, BasicFile::Mode::kRead); + ContainerPayload = ContainerFile.ReadAll(); + } + Result.ContainerObject = LoadCompactBinaryObject(ContainerPayload); + if (!Result.ContainerObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The file {} is not formatted as a compact binary object"sv, ContainerPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + Stopwatch Timer; + LoadAttachmentResult Result; + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!std::filesystem::is_regular_file(ChunkPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = fmt::format("The file {} does not exist"sv, ChunkPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + { + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); + Result.Bytes = ChunkFile.ReadAll(); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + Stopwatch Timer; + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const + { + ExtendablePathBuilder<128> ShardedPath; + ShardedPath.Append(m_OutputPath.c_str()); + ExtendableStringBuilder<64> HashString; + RawHash.ToHexString(HashString); + const char* str = HashString.c_str(); + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str, str + 3); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 3, str + 5); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 5, str + 40); + + return ShardedPath.ToPath(); + } + + const std::string m_Name; + const std::filesystem::path m_OutputPath; + bool m_EnableBlocks = true; + bool m_UseTempBlocks = false; +}; + +std::unique_ptr<RemoteProjectStore> +CreateFileRemoteStore(const FileRemoteStoreOptions& Options) +{ + std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name, + std::filesystem::path(Options.FolderPath), + Options.ForceDisableBlocks, + Options.ForceEnableTempBlocks); + return RemoteStore; +} + +} // namespace zen diff --git a/zenserver/projectstore/fileremoteprojectstore.h b/zenserver/projectstore/fileremoteprojectstore.h new file mode 100644 index 000000000..68d1eb71e --- /dev/null +++ b/zenserver/projectstore/fileremoteprojectstore.h @@ -0,0 +1,19 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +struct FileRemoteStoreOptions : RemoteStoreOptions +{ + std::filesystem::path FolderPath; + std::string Name; + bool ForceDisableBlocks; + bool ForceEnableTempBlocks; +}; + +std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); + +} // namespace zen diff --git a/zenserver/projectstore/jupiterremoteprojectstore.cpp b/zenserver/projectstore/jupiterremoteprojectstore.cpp new file mode 100644 index 000000000..66cf3c4f8 --- /dev/null +++ b/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -0,0 +1,244 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "jupiterremoteprojectstore.h" + +#include <zencore/compress.h> +#include <zencore/fmtutils.h> + +#include <auth/authmgr.h> +#include <upstream/jupiter.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class JupiterRemoteStore : public RemoteProjectStore +{ +public: + JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& Key, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks) + : m_CloudClient(CloudClient) + , m_Namespace(Namespace) + , m_Bucket(Bucket) + , m_Key(Key) + { + if (ForceDisableBlocks) + { + m_EnableBlocks = false; + } + if (ForceDisableTempBlocks) + { + m_UseTempBlocks = false; + } + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + const int32_t MaxAttempts = 3; + PutRefResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); + } + } + + return SaveResult{ConvertResult(Result), {Result.Needs.begin(), Result.Needs.end()} /*, {}*/, IoHash::HashBuffer(Payload)}; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); + } + } + + return SaveAttachmentResult{ConvertResult(Result)}; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + SaveAttachmentsResult Result; + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); + if (ChunkResult.ErrorCode) + { + return SaveAttachmentsResult{ChunkResult}; + } + } + return Result; + } + + virtual Result FinalizeContainer(const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); + } + } + return ConvertResult(Result); + } + + virtual LoadContainerResult LoadContainer() override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.GetRef(m_Namespace, m_Bucket, m_Key, ZenContentType::kCbObject); + } + } + + if (Result.ErrorCode || !Result.Success) + { + return LoadContainerResult{ConvertResult(Result)}; + } + + CbObject ContainerObject = LoadCompactBinaryObject(Result.Response); + if (!ContainerObject) + { + return LoadContainerResult{ + RemoteProjectStore::Result{ + .ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + .ElapsedSeconds = Result.ElapsedSeconds, + .Reason = fmt::format("The ref {}/{}/{} is not formatted as a compact binary object"sv, m_Namespace, m_Bucket, m_Key)}, + std::move(ContainerObject)}; + } + + return LoadContainerResult{ConvertResult(Result), std::move(ContainerObject)}; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.GetCompressedBlob(m_Namespace, RawHash); + } + } + return LoadAttachmentResult{ConvertResult(Result), std::move(Result.Response)}; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + static Result ConvertResult(const CloudCacheResult& Response) + { + std::string Text; + int32_t ErrorCode = 0; + if (Response.ErrorCode != 0) + { + ErrorCode = Response.ErrorCode; + } + else if (!Response.Success) + { + ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + if (Response.Response.GetContentType() == ZenContentType::kText) + { + Text = + std::string(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), Response.Response.GetSize()); + } + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; + } + + Ref<CloudCacheClient> m_CloudClient; + const std::string m_Namespace; + const std::string m_Bucket; + const IoHash m_Key; + bool m_EnableBlocks = true; + bool m_UseTempBlocks = true; +}; + +std::unique_ptr<RemoteProjectStore> +CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("https://{}"sv, Url); + } + CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv, + .ServiceUrl = Url, + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(60000)}; + // 1) Access token as parameter in request + // 2) Environment variable (different win vs linux/mac) + // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + if (!Options.AccessToken.empty()) + { + TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = Options.AccessToken]() { + return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; + }); + } + else + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + + Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); + + std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient), + Options.Namespace, + Options.Bucket, + Options.Key, + Options.ForceDisableBlocks, + Options.ForceDisableTempBlocks); + return RemoteStore; +} + +} // namespace zen diff --git a/zenserver/projectstore/jupiterremoteprojectstore.h b/zenserver/projectstore/jupiterremoteprojectstore.h new file mode 100644 index 000000000..31548af22 --- /dev/null +++ b/zenserver/projectstore/jupiterremoteprojectstore.h @@ -0,0 +1,26 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +class AuthMgr; + +struct JupiterRemoteStoreOptions : RemoteStoreOptions +{ + std::string Url; + std::string Namespace; + std::string Bucket; + IoHash Key; + std::string OpenIdProvider; + std::string AccessToken; + AuthMgr& AuthManager; + bool ForceDisableBlocks; + bool ForceDisableTempBlocks; +}; + +std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options); + +} // namespace zen diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp index a07698d50..82aac1605 100644 --- a/zenserver/projectstore/projectstore.cpp +++ b/zenserver/projectstore/projectstore.cpp @@ -5,32 +5,31 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> -#include <zencore/compactbinaryvalue.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zencore/workthreadpool.h> #include <zenhttp/httpshared.h> #include <zenstore/caslog.h> +#include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> -#include <zenutil/basicfile.h> -#include "config.h" - -#include <latch> +#include "fileremoteprojectstore.h" +#include "jupiterremoteprojectstore.h" +#include "remoteprojectstore.h" +#include "zenremoteprojectstore.h" ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> #include <xxh3.h> ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> #endif // ZEN_WITH_TESTS namespace zen { @@ -69,6 +68,149 @@ namespace { Sleep(100); } while (true); } + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize) + { + using namespace std::literals; + + std::unique_ptr<RemoteProjectStore> RemoteStore; + + if (CbObjectView File = Params["file"sv].AsObjectView(); File) + { + std::filesystem::path FolderPath(File["path"sv].AsString()); + if (FolderPath.empty()) + { + return {nullptr, "Missing file path"}; + } + std::string_view Name(File["name"sv].AsString()); + if (Name.empty()) + { + return {nullptr, "Missing file name"}; + } + bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); + bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); + + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + FolderPath, + std::string(Name), + ForceDisableBlocks, + ForceEnableTempBlocks}; + RemoteStore = CreateFileRemoteStore(Options); + } + + if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) + { + std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); + if (CloudServiceUrl.empty()) + { + return {nullptr, "Missing service url"}; + } + + std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl)); + std::string_view Namespace = Cloud["namespace"sv].AsString(); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + std::string_view Bucket = Cloud["bucket"sv].AsString(); + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); + std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); + if (AccessToken.empty()) + { +#if PLATFORM_WINDOWS + + CHAR EnvVariableBuffer[1023 + 1]; + DWORD RESULT = GetEnvironmentVariableA("UE-CloudDataCacheAccessToken", EnvVariableBuffer, sizeof(EnvVariableBuffer)); + if (RESULT > 0 && RESULT < sizeof(EnvVariableBuffer)) + { + AccessToken = std::string(EnvVariableBuffer); + } +#endif +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + char* EnvVariable = getenv("UE_CloudDataCacheAccessToken"); + if (EnvVariable) + { + AccessToken = std::string(EnvVariable); + } +#endif + } + std::string_view KeyParam = Cloud["key"sv].AsString(); + if (KeyParam.empty()) + { + return {nullptr, "Missing key"}; + } + if (KeyParam.length() != IoHash::StringLength) + { + return {nullptr, "Invalid key"}; + } + IoHash Key = IoHash::FromHexString(KeyParam); + if (Key == IoHash::Zero) + { + return {nullptr, "Invalid key string"}; + } + bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); + bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); + + JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + Key, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + ForceDisableBlocks, + ForceDisableTempBlocks}; + RemoteStore = CreateJupiterRemoteStore(Options); + } + + if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) + { + std::string_view Url = Zen["url"sv].AsString(); + std::string_view Project = Zen["project"sv].AsString(); + if (Project.empty()) + { + return {nullptr, "Missing project"}; + } + std::string_view Oplog = Zen["oplog"sv].AsString(); + if (Oplog.empty()) + { + return {nullptr, "Missing oplog"}; + } + ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + std::string(Url), + std::string(Project), + std::string(Oplog)}; + RemoteStore = CreateZenRemoteStore(Options); + } + + if (!RemoteStore) + { + return {nullptr, "Unknown remote store type"}; + } + + return {std::move(RemoteStore), ""}; + } + + std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) + { + if (Result.ErrorCode == 0) + { + return {HttpResponseCode::OK, Result.Text}; + } + return {static_cast<HttpResponseCode>(Result.ErrorCode), + Result.Reason.empty() ? Result.Text + : Result.Text.empty() ? Result.Reason + : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)}; + } + } // namespace ////////////////////////////////////////////////////////////////////////// @@ -794,7 +936,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) XXH3_128 KeyHash = KeyHasher.GetHash(); RefPtr<OplogStorage> Storage; - { RwLock::SharedLockScope _(m_OplogLock); Storage = m_Storage; @@ -1613,7 +1754,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, if (!FoundLog) { - return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) @@ -1636,7 +1777,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, { IoHash RawHash; uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); ZEN_ASSERT(!Compressed.IsNull()); if (IsOffset) @@ -1679,7 +1820,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, { Size = Chunk.GetSize() - Offset; } - OutChunk = IoBuffer(Chunk, Offset, Size); + OutChunk = IoBuffer(std::move(Chunk), Offset, Size); OutChunk.SetContentType(ContentType); } @@ -1693,8 +1834,6 @@ ProjectStore::GetChunk(const std::string_view ProjectId, ZenContentType AcceptType, IoBuffer& OutChunk) { - using namespace std::literals; - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); if (!Project) { @@ -1705,7 +1844,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId, if (!FoundLog) { - return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } if (Cid.length() != IoHash::StringLength) @@ -1721,25 +1860,401 @@ ProjectStore::GetChunk(const std::string_view ProjectId, return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)}; } - if (AcceptType == HttpContentType::kBinary) + if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); OutChunk = Compressed.Decompress().AsIoBuffer(); - OutChunk.SetContentType(HttpContentType::kBinary); + OutChunk.SetContentType(ZenContentType::kBinary); } else { - OutChunk.SetContentType(HttpContentType::kCompressedBinary); + OutChunk.SetContentType(ZenContentType::kCompressedBinary); } return {HttpResponseCode::OK, {}}; } +std::pair<HttpResponseCode, std::string> +ProjectStore::PutChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType ContentType, + IoBuffer&& Chunk) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (Cid.length() != IoHash::StringLength) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)}; + } + + const IoHash Hash = IoHash::FromHexString(Cid); + + if (ContentType != HttpContentType::kCompressedBinary) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)}; + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + if (RawHash != Hash) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)}; + } + + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash); + return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + CbObject ContainerObject = LoadCompactBinaryObject(Payload); + if (!ContainerObject) + { + return {HttpResponseCode::BadRequest, "Invalid payload format"}; + } + + CidStore& ChunkStore = m_CidStore; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + + auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); }; + auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + if (BlockHash != IoHash::Zero) + { + Attachments.insert(BlockHash); + } + else + { + Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); + } + }; + auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + Attachments.insert(RawHash); + }; + + RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + + if (RemoteResult.ErrorCode) + { + return ConvertResult(RemoteResult); + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + { + for (const IoHash& Hash : Attachments) + { + ZEN_DEBUG("Need attachment {}", Hash); + Cbo << Hash; + } + } + Cbo.EndArray(); // "need" + + OutResponse = Cbo.Save(); + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::ReadOplog(const std::string_view ProjectId, + const std::string_view OplogId, + const HttpServerRequest::QueryParams& Params, + CbObject& OutResponse) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + size_t MaxBlockSize = 128u * 1024u * 1024u; + if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxBlockSize = Value.value(); + } + } + size_t MaxChunkEmbedSize = 1024u * 1024u; + if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxChunkEmbedSize = Value.value(); + } + } + + CidStore& ChunkStore = m_CidStore; + + RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( + ChunkStore, + *Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + false, + [](CompressedBuffer&&, const IoHash) {}, + [](const IoHash&) {}, + [](const std::unordered_set<IoHash, IoHash::Hasher>) {}); + + OutResponse = std::move(ContainerResult.ContainerObject); + return ConvertResult(ContainerResult); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer(); + m_CidStore.AddChunk(Compressed, AttachmentRawHash); + ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize()); + })) + { + return {HttpResponseCode::BadRequest, "Invalid chunk in block"}; + } + + return {HttpResponseCode::OK, {}}; +} + +void +ProjectStore::Rpc(HttpServerRequest& HttpReq, + const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + AuthMgr& AuthManager) +{ + using namespace std::literals; + HttpContentType PayloadContentType = HttpReq.RequestContentType(); + CbPackage Package; + CbObject Cb; + switch (PayloadContentType) + { + case HttpContentType::kJSON: + case HttpContentType::kUnknownContentType: + case HttpContentType::kText: + { + std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected JSON format"); + } + } + break; + case HttpContentType::kCbObject: + Cb = LoadCompactBinaryObject(Payload); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected compact binary format"); + } + break; + case HttpContentType::kCbPackage: + Package = ParsePackageMessage(Payload); + Cb = Package.GetObject(); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected package message format"); + } + break; + default: + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); + } + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + + std::string_view Method = Cb["method"sv].AsString(); + + if (Method == "import") + { + std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + else if (Method == "export") + { + std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + else if (Method == "getchunks") + { + CbPackage ResponsePackage; + { + CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("chunks"sv); + for (CbFieldView FieldView : ChunksArray) + { + IoHash RawHash = FieldView.AsHash(); + IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); + if (ChunkBuffer) + { + ResponseWriter.AddHash(RawHash); + ResponsePackage.AddAttachment( + CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash)); + } + } + ResponseWriter.EndArray(); + ResponsePackage.SetObject(ResponseWriter.Save()); + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else if (Method == "putchunks") + { + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + CompressedBuffer Compressed = Attachment.AsCompressedBinary(); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly); + } + return HttpReq.WriteResponse(HttpResponseCode::OK); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +{ + using namespace std::literals; + + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + bool Force = Params["force"sv].AsBool(false); + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = + CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); + + if (RemoteStoreResult.first == nullptr) + { + return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + } + std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", + Project.Identifier, + Oplog.OplogId(), + StoreInfo.Description, + NiceBytes(MaxBlockSize), + NiceBytes(MaxChunkEmbedSize)); + + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *RemoteStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + StoreInfo.CreateBlocks, + StoreInfo.UseTempBlockFiles, + Force); + + return ConvertResult(Result); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +{ + using namespace std::literals; + + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + bool Force = Params["force"sv].AsBool(false); + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = + CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); + + if (RemoteStoreResult.first == nullptr) + { + return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + } + std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); + RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force); + return ConvertResult(Result); +} + ////////////////////////////////////////////////////////////////////////// -HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) +HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, AuthMgr& AuthMgr) : m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectStore(Projects) +, m_AuthMgr(AuthMgr) { using namespace std::literals; @@ -1918,7 +2433,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) static_cast<int>(Result.first), Result.second); } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); }, HttpVerb::kGet); @@ -2023,33 +2537,60 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& Cid = Req.GetCapture(3); - HttpContentType AcceptType = HttpReq.AcceptContentType(); - - IoBuffer Value; - std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& Cid = Req.GetCapture(3); + HttpContentType AcceptType = HttpReq.AcceptContentType(); + HttpContentType RequestType = HttpReq.RequestContentType(); - if (Result.first == HttpResponseCode::OK) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); - } - else if (Result.first == HttpResponseCode::NotFound) - { - ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); - } - else + switch (Req.ServerRequest().RequestVerb()) { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - static_cast<int>(Result.first), - Result.second); + case HttpVerb::kGet: + { + IoBuffer Value; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value); + + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); + } + else if (Result.first == HttpResponseCode::NotFound) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + case HttpVerb::kPost: + { + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload()); + if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created) + { + return HttpReq.WriteResponse(Result.first); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + break; } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); }, - HttpVerb::kGet); + HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/prep", @@ -2556,6 +3097,67 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) } }, HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete); + + // Push a oplog container + m_Router.RegisterRoute( + "{project}/oplog/{log}/save", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + if (HttpReq.RequestContentType() != HttpContentType::kCbObject) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); + } + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + CbObject Response; + std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kPost); + + // Pull a oplog container + m_Router.RegisterRoute( + "{project}/oplog/{log}/load", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + if (HttpReq.AcceptContentType() != HttpContentType::kCbObject) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); + } + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + CbObject Response; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet); + + // Do an rpc style operation on project/oplog + m_Router.RegisterRoute( + "{project}/oplog/{log}/rpc", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr); + }, + HttpVerb::kPost); } HttpProjectService::~HttpProjectService() @@ -2625,9 +3227,14 @@ namespace testutils { { std::vector<uint8_t> Data; Data.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) + uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); + for (size_t Idx = 0; Idx < Size / 2; ++Idx) { - Data[Idx] = Idx % 255; + DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu); + } + if (Size & 1) + { + Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff); } CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); @@ -2908,6 +3515,28 @@ TEST_CASE("project.store.partial.read") const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); CHECK(FullDataPtr[0] == PartialDataPtr[0]); } + +TEST_CASE("project.store.block") +{ + using namespace std::literals; + using namespace testutils; + + std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, + 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, + 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); + + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); + std::vector<SharedBuffer> Chunks; + Chunks.reserve(AttachmentSizes.size()); + for (const auto& It : AttachmentsWithId) + { + Chunks.push_back(It.second.GetCompressed().Flatten()); + } + CompressedBuffer Block = GenerateBlock(std::move(Chunks)); + IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); + CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); +} + #endif void diff --git a/zenserver/projectstore/projectstore.h b/zenserver/projectstore/projectstore.h index 6b214d5a2..928a74f59 100644 --- a/zenserver/projectstore/projectstore.h +++ b/zenserver/projectstore/projectstore.h @@ -2,18 +2,11 @@ #pragma once -#include <zencore/logging.h> #include <zencore/uid.h> #include <zencore/xxhash.h> #include <zenhttp/httpserver.h> -#include <zenstore/cidstore.h> #include <zenstore/gc.h> -#include <filesystem> -#include <map> -#include <optional> -#include <string> - ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -21,6 +14,9 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { class CbPackage; +class CidStore; +class AuthMgr; +class ScrubContext; struct OplogEntry { @@ -141,13 +137,13 @@ public: std::filesystem::path m_MarkerPath; std::filesystem::path m_TempPath; - mutable RwLock m_OplogLock; - OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address - OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address - OidMap<FileMapEntry> m_FileMap; // file id -> file map entry - int32_t m_ManifestVersion; // File system manifest version - std::map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file - OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key + mutable RwLock m_OplogLock; + OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address + OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address + OidMap<FileMapEntry> m_FileMap; // file id -> file map entry + int32_t m_ManifestVersion; // File system manifest version + tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file + OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key RefPtr<OplogStorage> m_Storage; std::string m_OplogId; @@ -280,6 +276,42 @@ public: ZenContentType AcceptType, IoBuffer& OutChunk); + std::pair<HttpResponseCode, std::string> PutChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType ContentType, + IoBuffer&& Chunk); + + std::pair<HttpResponseCode, std::string> WriteOplog(const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + CbObject& OutResponse); + + std::pair<HttpResponseCode, std::string> ReadOplog(const std::string_view ProjectId, + const std::string_view OplogId, + const HttpServerRequest::QueryParams& Params, + CbObject& OutResponse); + + std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload); + + void Rpc(HttpServerRequest& HttpReq, + const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + AuthMgr& AuthManager); + + std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + CbObjectView&& Params, + AuthMgr& AuthManager); + + std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + CbObjectView&& Params, + AuthMgr& AuthManager); + private: spdlog::logger& m_Log; CidStore& m_CidStore; @@ -312,7 +344,7 @@ private: class HttpProjectService : public HttpService { public: - HttpProjectService(CidStore& Store, ProjectStore* InProjectStore); + HttpProjectService(CidStore& Store, ProjectStore* InProjectStore, AuthMgr& AuthMgr); ~HttpProjectService(); virtual const char* BaseUri() const override; @@ -325,6 +357,7 @@ private: CidStore& m_CidStore; HttpRequestRouter m_Router; Ref<ProjectStore> m_ProjectStore; + AuthMgr& m_AuthMgr; }; void prj_forcelink(); diff --git a/zenserver/projectstore/remoteprojectstore.cpp b/zenserver/projectstore/remoteprojectstore.cpp new file mode 100644 index 000000000..1e6ca51a1 --- /dev/null +++ b/zenserver/projectstore/remoteprojectstore.cpp @@ -0,0 +1,1036 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "remoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zencore/workthreadpool.h> +#include <zenstore/cidstore.h> + +namespace zen { + +/* + OplogContainer + Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller + { + CbArray("ops") + { + CbObject Op + (CbFieldType::BinaryAttachment Attachments[]) + (OpData) + } + } + CbArray("blocks") + CbObject + CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File) + CbArray("chunks") + CbFieldType::Hash // Chunk hashes + CbArray("chunks") // Optional, only if we are not creating blocks (Zen) + CbFieldType::BinaryAttachment // Chunk attachment hashes + + CompressedBinary ChunkBlock + { + VarUInt ChunkCount + VarUInt ChunkSizes[ChunkCount] + uint8_t[chunksize])[ChunkCount] + } +*/ + +////////////////////////////// AsyncRemoteResult + +struct AsyncRemoteResult +{ + void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) + { + int32_t Expected = 0; + if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) + { + m_ErrorReason = ErrorReason; + m_ErrorText = ErrorText; + } + } + bool IsError() const { return m_ErrorCode.load() != 0; } + int GetError() const { return m_ErrorCode.load(); }; + const std::string& GetErrorReason() const { return m_ErrorReason; }; + const std::string& GetErrorText() const { return m_ErrorText; }; + RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const + { + return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; + } + +private: + std::atomic<int32_t> m_ErrorCode = 0; + std::string m_ErrorReason; + std::string m_ErrorText; +}; + +bool +IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) +{ + IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); + + MemoryView BlockView = BlockPayload.GetView(); + const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); + uint32_t NumberSize; + uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); + ReadPtr += NumberSize; + std::vector<uint64_t> ChunkSizes; + ChunkSizes.reserve(ChunkCount); + while (ChunkCount--) + { + ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); + ReadPtr += NumberSize; + } + ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr); + ZEN_ASSERT(TempBufferLength > 0); + for (uint64_t ChunkSize : ChunkSizes) + { + IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); + IoHash AttachmentRawHash; + uint64_t AttachmentRawSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); + + if (!CompressedChunk) + { + ZEN_ERROR("Invalid chunk in block"); + return false; + } + Visitor(std::move(CompressedChunk), AttachmentRawHash); + ReadPtr += ChunkSize; + ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); + } + return true; +}; + +CompressedBuffer +GenerateBlock(std::vector<SharedBuffer>&& Chunks) +{ + size_t ChunkCount = Chunks.size(); + SharedBuffer SizeBuffer; + { + IoBuffer TempBuffer(ChunkCount * 9); + MutableMemoryView View = TempBuffer.GetMutableView(); + uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); + uint8_t* BufferEndPtr = BufferStartPtr; + BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); + auto It = Chunks.begin(); + while (It != Chunks.end()) + { + BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr); + It++; + } + ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); + ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); + SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); + } + CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))); + + CompressedBuffer CompressedBlock = + CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None); + + return CompressedBlock; +} + +struct Block +{ + IoHash BlockHash; + std::vector<IoHash> ChunksInBlock; +}; + +void +CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<SharedBuffer>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<Block>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) +{ + OpSectionsLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + if (!Chunks.empty()) + { + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + AsyncOnBlock(std::move(CompressedBlock), BlockHash); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex].BlockHash = BlockHash; + } + } + }); +} + +size_t +AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) +{ + size_t BlockIndex; + { + RwLock::ExclusiveLockScope _(BlocksLock); + BlockIndex = Blocks.size(); + Blocks.resize(BlockIndex + 1); + } + return BlockIndex; +} + +CbObject +BuildContainer(CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + WorkerThreadPool& WorkerPool, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + AsyncRemoteResult& RemoteResult) +{ + using namespace std::literals; + + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + CbObjectWriter SectionOpsWriter; + SectionOpsWriter.BeginArray("ops"sv); + + size_t OpCount = 0; + + CbObject OplogContainerObject; + { + RwLock BlocksLock; + std::vector<Block> Blocks; + CompressedBuffer OpsBuffer; + + Latch BlockCreateLatch(1); + + std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; + + size_t BlockSize = 0; + std::vector<SharedBuffer> ChunksInBlock; + + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + Oplog.IterateOplog([&Attachments, &SectionOpsWriter, &OpCount](CbObject Op) { + Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert(FieldView.AsAttachment()); }); + (SectionOpsWriter) << Op; + OpCount++; + }); + + for (const IoHash& AttachmentHash : Attachments) + { + IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {} for op", AttachmentHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()); + return {}; + } + uint64_t PayloadSize = Payload.GetSize(); + if (PayloadSize > MaxChunkEmbedSize) + { + if (LargeChunkHashes.insert(AttachmentHash).second) + { + OnLargeAttachment(AttachmentHash); + } + continue; + } + + if (!BlockAttachmentHashes.insert(AttachmentHash).second) + { + continue; + } + + BlockSize += PayloadSize; + if (BuildBlocks) + { + ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload))); + } + else + { + Payload = {}; + } + + if (BlockSize >= MaxBlockSize) + { + size_t BlockIndex = AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + } + else + { + OnBlockChunks(BlockAttachmentHashes); + } + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); + } + BlockAttachmentHashes.clear(); + ChunksInBlock.clear(); + BlockSize = 0; + } + } + if (BlockSize > 0) + { + size_t BlockIndex = AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + } + else + { + OnBlockChunks(BlockAttachmentHashes); + } + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); + } + BlockAttachmentHashes.clear(); + ChunksInBlock.clear(); + BlockSize = 0; + } + SectionOpsWriter.EndArray(); // "ops" + + CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); + ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize())); + + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining()); + } + + if (!RemoteResult.IsError()) + { + CbObjectWriter OplogContinerWriter; + RwLock::SharedLockScope _(BlocksLock); + OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); + + OplogContinerWriter.BeginArray("blocks"sv); + { + for (const Block& B : Blocks) + { + ZEN_ASSERT(!B.ChunksInBlock.empty()); + if (BuildBlocks) + { + ZEN_ASSERT(B.BlockHash != IoHash::Zero); + + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : B.ChunksInBlock) + { + OplogContinerWriter.AddHash(RawHash); + } + } + OplogContinerWriter.EndArray(); // "chunks" + } + OplogContinerWriter.EndObject(); + continue; + } + + ZEN_ASSERT(B.BlockHash == IoHash::Zero); + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : B.ChunksInBlock) + { + OplogContinerWriter.AddBinaryAttachment(RawHash); + } + } + OplogContinerWriter.EndArray(); + } + OplogContinerWriter.EndObject(); + } + } + OplogContinerWriter.EndArray(); // "blocks"sv + + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& AttachmentHash : LargeChunkHashes) + { + OplogContinerWriter.AddBinaryAttachment(AttachmentHash); + } + } + OplogContinerWriter.EndArray(); // "chunks" + + OplogContainerObject = OplogContinerWriter.Save(); + } + } + return OplogContainerObject; +} + +RemoteProjectStore::LoadContainerResult +BuildContainer(CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks) +{ + // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a + // WorkerThreadPool alive + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + AsyncRemoteResult RemoteResult; + CbObject ContainerObject = BuildContainer(ChunkStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + BuildBlocks, + WorkerPool, + AsyncOnBlock, + OnLargeAttachment, + OnBlockChunks, + RemoteResult); + return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; +} + +RemoteProjectStore::Result +SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + bool UseTempBlocks, + bool ForceUpload) +{ + using namespace std::literals; + + Stopwatch Timer; + + // We are creating a worker thread pool here since we are uploading a lot of attachments in one go + // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + std::filesystem::path AttachmentTempPath; + if (UseTempBlocks) + { + AttachmentTempPath = Oplog.TempPath(); + AttachmentTempPath.append(".pending"); + CreateDirectories(AttachmentTempPath); + } + + AsyncRemoteResult RemoteResult; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; + + auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + const IoHash& BlockHash) { + std::filesystem::path BlockPath = AttachmentTempPath; + BlockPath.append(BlockHash.ToHexString()); + if (!std::filesystem::exists(BlockPath)) + { + IoBuffer BlockBuffer; + try + { + BasicFile BlockFile; + BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete); + uint64_t Offset = 0; + for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments()) + { + BlockFile.Write(Buffer.GetView(), Offset); + Offset += Buffer.GetSize(); + } + void* FileHandle = BlockFile.Detach(); + BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); + } + catch (std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + Ex.what(), + "Unable to create temp block file"); + return; + } + + BlockBuffer.MarkAsDeleteOnClose(); + { + RwLock::ExclusiveLockScope __(AttachmentsLock); + CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); + } + ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); + } + }; + + auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { + RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + return; + } + ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); + }; + + std::vector<std::vector<IoHash>> BlockChunks; + auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) { + BlockChunks.push_back({Chunks.begin(), Chunks.end()}); + ZEN_DEBUG("Found {} block chunks", Chunks.size()); + }; + + auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) { + { + RwLock::ExclusiveLockScope _(AttachmentsLock); + LargeAttachments.insert(AttachmentHash); + } + ZEN_DEBUG("Found attachment {}", AttachmentHash); + }; + + std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock; + if (UseTempBlocks) + { + OnBlock = MakeTempBlock; + } + else + { + OnBlock = UploadBlock; + } + + CbObject OplogContainerObject = BuildContainer(ChunkStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + BuildBlocks, + WorkerPool, + OnBlock, + OnLargeAttachment, + OnBlockChunks, + RemoteResult); + + if (!RemoteResult.IsError()) + { + uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); + uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); + ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount); + RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); + if (ContainerSaveResult.ErrorCode) + { + RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); + ZEN_ERROR("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + } + ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))); + if (!ContainerSaveResult.Needs.empty()) + { + ZEN_INFO("Filtering needed attachments..."); + std::vector<IoHash> NeededLargeAttachments; + std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments; + NeededLargeAttachments.reserve(LargeAttachments.size()); + NeededOtherAttachments.reserve(CreatedBlocks.size()); + if (ForceUpload) + { + NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end()); + } + else + { + for (const IoHash& RawHash : ContainerSaveResult.Needs) + { + if (LargeAttachments.contains(RawHash)) + { + NeededLargeAttachments.push_back(RawHash); + continue; + } + NeededOtherAttachments.insert(RawHash); + } + } + + Latch SaveAttachmentsLatch(1); + if (!NeededLargeAttachments.empty()) + { + ZEN_INFO("Saving large attachments..."); + for (const IoHash& RawHash : NeededLargeAttachments) + { + if (RemoteResult.IsError()) + { + break; + } + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + IoBuffer Payload; + if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end()) + { + Payload = std::move(It->second); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", + RemoteResult.GetErrorReason(), + RemoteResult.GetError()); + return; + } + + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); + } + } + + if (!CreatedBlocks.empty()) + { + ZEN_INFO("Saving created block attachments..."); + for (auto& It : CreatedBlocks) + { + if (RemoteResult.IsError()) + { + break; + } + const IoHash& RawHash = It.first; + if (ForceUpload || NeededOtherAttachments.contains(RawHash)) + { + IoBuffer Payload = It.second; + ZEN_ASSERT(Payload); + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); + } + It.second = {}; + } + } + + if (!BlockChunks.empty()) + { + ZEN_INFO("Saving chunk block attachments..."); + for (const std::vector<IoHash>& Chunks : BlockChunks) + { + if (RemoteResult.IsError()) + { + break; + } + std::vector<IoHash> NeededChunks; + if (ForceUpload) + { + NeededChunks = Chunks; + } + else + { + NeededChunks.reserve(Chunks.size()); + for (const IoHash& Chunk : Chunks) + { + if (NeededOtherAttachments.contains(Chunk)) + { + NeededChunks.push_back(Chunk); + } + } + if (NeededChunks.empty()) + { + continue; + } + } + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &SaveAttachmentsLatch, + &RemoteResult, + &Chunks, + NeededChunks = std::move(NeededChunks), + ForceUpload]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) + { + IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk); + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + }); + } + } + SaveAttachmentsLatch.CountDown(); + while (!SaveAttachmentsLatch.Wait(1000)) + { + ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); + } + SaveAttachmentsLatch.Wait(); + } + + if (!RemoteResult.IsError()) + { + ZEN_INFO("Finalizing oplog container..."); + RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); + if (ContainerFinalizeResult.ErrorCode) + { + RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); + ZEN_ERROR("Failed to finalize oplog container {} ({}). Reason: '{}'", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + } + ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); + } + } + + RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + ZEN_INFO("Saved oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return Result; +}; + +RemoteProjectStore::Result +SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment) +{ + using namespace std::literals; + + Stopwatch Timer; + + CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); + for (CbFieldView LargeChunksField : LargeChunksArray) + { + IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); + if (HasAttachment(AttachmentHash)) + { + continue; + } + OnNeedAttachment(AttachmentHash); + }; + + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + for (CbFieldView BlockField : BlocksArray) + { + CbObjectView BlockView = BlockField.AsObjectView(); + IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); + + CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); + if (BlockHash == IoHash::Zero) + { + std::vector<IoHash> NeededChunks; + NeededChunks.reserve(ChunksArray.GetSize()); + for (CbFieldView ChunkField : ChunksArray) + { + IoHash ChunkHash = ChunkField.AsBinaryAttachment(); + if (HasAttachment(ChunkHash)) + { + continue; + } + NeededChunks.emplace_back(ChunkHash); + } + + if (!NeededChunks.empty()) + { + OnNeedBlock(IoHash::Zero, std::move(NeededChunks)); + } + continue; + } + + for (CbFieldView ChunkField : ChunksArray) + { + IoHash ChunkHash = ChunkField.AsHash(); + if (HasAttachment(ChunkHash)) + { + continue; + } + + OnNeedBlock(BlockHash, {}); + break; + } + }; + + MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); + IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); + IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + + CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); + if (!SectionObject) + { + ZEN_ERROR("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type"); + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), + Timer.GetElapsedTimeMs() / 1000.500, + "Section has unexpected data type", + "Failed to save oplog container"}; + } + + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + for (CbFieldView OpEntry : OpsArray) + { + CbObjectView Core = OpEntry.AsObjectView(); + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op); + if (OpLsn == ProjectStore::Oplog::kInvalidOp) + { + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), + Timer.GetElapsedTimeMs() / 1000.500, + "Failed saving op", + "Failed to save oplog container"}; + } + ZEN_DEBUG("oplog entry #{}", OpLsn); + } + return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; +} + +RemoteProjectStore::Result +LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload) +{ + using namespace std::literals; + + Stopwatch Timer; + + // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a + // WorkerThreadPool alive + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + std::vector<std::vector<IoHash>> ChunksInBlocks; + + RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); + if (LoadContainerResult.ErrorCode) + { + ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode); + return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Reason = LoadContainerResult.Reason, + .Text = LoadContainerResult.Text}; + } + ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))); + + AsyncRemoteResult RemoteResult; + Latch AttachmentsWorkLatch(1); + + auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { + return !ForceDownload && ChunkStore.ContainsChunk(RawHash); + }; + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult]( + const IoHash& BlockHash, + std::vector<IoHash>&& Chunks) { + if (BlockHash == IoHash::Zero) + { + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + for (const auto& It : Result.Chunks) + { + ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); + } + }); + return; + } + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + ZEN_ERROR("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + + if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + })) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + ZEN_ERROR("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + }); + }; + + auto OnNeedAttachment = + [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) { + if (!Attachments.insert(RawHash).second) + { + return; + } + + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode); + return; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + }); + }; + + RemoteProjectStore::Result Result = + SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + + AttachmentsWorkLatch.CountDown(); + while (!AttachmentsWorkLatch.Wait(1000)) + { + ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining()); + } + AttachmentsWorkLatch.Wait(); + if (Result.ErrorCode == 0) + { + Result = RemoteResult.ConvertResult(); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + + ZEN_INFO("Loaded oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))); + + return Result; +} + +} // namespace zen diff --git a/zenserver/projectstore/remoteprojectstore.h b/zenserver/projectstore/remoteprojectstore.h new file mode 100644 index 000000000..dcabaedd4 --- /dev/null +++ b/zenserver/projectstore/remoteprojectstore.h @@ -0,0 +1,111 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "projectstore.h" + +#include <unordered_set> + +namespace zen { + +class CidStore; +class WorkerThreadPool; + +class RemoteProjectStore +{ +public: + struct Result + { + int32_t ErrorCode{}; + double ElapsedSeconds{}; + std::string Reason; + std::string Text; + }; + + struct SaveResult : public Result + { + std::unordered_set<IoHash, IoHash::Hasher> Needs; + IoHash RawHash; + }; + + struct SaveAttachmentResult : public Result + { + }; + + struct SaveAttachmentsResult : public Result + { + }; + + struct LoadAttachmentResult : public Result + { + IoBuffer Bytes; + }; + + struct LoadContainerResult : public Result + { + CbObject ContainerObject; + }; + + struct LoadAttachmentsResult : public Result + { + std::vector<std::pair<IoHash, CompressedBuffer>> Chunks; + }; + + struct RemoteStoreInfo + { + bool CreateBlocks; + bool UseTempBlockFiles; + std::string Description; + }; + + virtual ~RemoteProjectStore() {} + + virtual RemoteStoreInfo GetInfo() const = 0; + + virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0; + virtual Result FinalizeContainer(const IoHash& RawHash) = 0; + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; + + virtual LoadContainerResult LoadContainer() = 0; + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0; + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0; +}; + +struct RemoteStoreOptions +{ + size_t MaxBlockSize = 128u * 1024u * 1024u; + size_t MaxChunkEmbedSize = 1024u * 1024u; +}; + +RemoteProjectStore::LoadContainerResult BuildContainer( + CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks); + +RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment); + +RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + bool UseTempBlocks, + bool ForceUpload); + +RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload); + +CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks); +bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); + +} // namespace zen diff --git a/zenserver/projectstore/zenremoteprojectstore.cpp b/zenserver/projectstore/zenremoteprojectstore.cpp new file mode 100644 index 000000000..6ff471ae5 --- /dev/null +++ b/zenserver/projectstore/zenremoteprojectstore.cpp @@ -0,0 +1,341 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenremoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compositebuffer.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zenhttp/httpshared.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenRemoteStore : public RemoteProjectStore +{ +public: + ZenRemoteStore(std::string_view HostAddress, + std::string_view Project, + std::string_view Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize) + : m_HostAddress(HostAddress) + , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) + , m_Project(Project) + , m_Oplog(Oplog) + , m_MaxBlockSize(MaxBlockSize) + , m_MaxChunkEmbedSize(MaxChunkEmbedSize) + { + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); + MemoryView Data(Payload.GetView()); + Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()}); + cpr::Response Response = Session->Post(); + SaveResult Result = SaveResult{ConvertResult(Response)}; + + if (Result.ErrorCode) + { + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); + CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload); + if (!ResponseObject) + { + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); + for (CbFieldView FieldView : NeedsArray) + { + IoHash ChunkHash = FieldView.AsHash(); + Result.Needs.insert(ChunkHash); + } + + Result.RawHash = IoHash::HashBuffer(Payload); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + cpr::Response Response = Session->Post(); + SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + Stopwatch Timer; + + CbPackage RequestPackage; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "putchunks"sv); + RequestWriter.BeginArray("chunks"sv); + { + for (const SharedBuffer& Chunk : Chunks) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize); + RequestWriter.AddHash(RawHash); + RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash)); + } + } + RequestWriter.EndArray(); // "chunks" + RequestPackage.SetObject(RequestWriter.Save()); + } + CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault); + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); + + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + cpr::Response Response = Session->Post(); + SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + + CbObject Request; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "getchunks"sv); + RequestWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : RawHashes) + { + RequestWriter.AddHash(RawHash); + } + } + RequestWriter.EndArray(); // "chunks" + Request = RequestWriter.Save(); + } + IoBuffer Payload = Request.GetBuffer().AsIoBuffer(); + Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); + Session->SetUrl(SaveRequest); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}, + {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); + + cpr::Response Response = Session->Post(); + LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + Result.Chunks.reserve(Attachments.size()); + for (const CbAttachment& Attachment : Attachments) + { + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + }; + + virtual Result FinalizeContainer(const IoHash&) override + { + Stopwatch Timer; + + RwLock::ExclusiveLockScope _(SessionsLock); + Sessions.clear(); + return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; + } + + virtual LoadContainerResult LoadContainer() override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl(SaveRequest); + Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); + Session->SetParameters( + {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}}); + cpr::Response Response = Session->Get(); + + LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size())); + if (!Result.ContainerObject) + { + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); + Session->SetUrl({LoadRequest}); + Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); + cpr::Response Response = Session->Get(); + LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + +private: + std::unique_ptr<cpr::Session> AllocateSession() + { + RwLock::ExclusiveLockScope _(SessionsLock); + if (Sessions.empty()) + { + Sessions.emplace_back(std::make_unique<cpr::Session>()); + } + std::unique_ptr<cpr::Session> Session = std::move(Sessions.back()); + Sessions.pop_back(); + return Session; + } + + void ReleaseSession(std::unique_ptr<cpr::Session>&& Session) + { + RwLock::ExclusiveLockScope _(SessionsLock); + Sessions.emplace_back(std::move(Session)); + } + + static Result ConvertResult(const cpr::Response& Response) + { + std::string Text; + std::string Reason = Response.reason; + int32_t ErrorCode = 0; + if (Response.error.code != cpr::ErrorCode::OK) + { + ErrorCode = static_cast<int32_t>(Response.error.code); + if (!Response.error.message.empty()) + { + Reason = Response.error.message; + } + } + else if (!IsHttpSuccessCode(Response.status_code)) + { + ErrorCode = static_cast<int32_t>(Response.status_code); + + if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) + { + zen::HttpContentType ContentType = zen::ParseContentType(It->second); + if (ContentType == zen::HttpContentType::kText) + { + Text = Response.text; + } + } + + Reason = fmt::format("{}"sv, Response.status_code); + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text}; + } + + RwLock SessionsLock; + std::vector<std::unique_ptr<cpr::Session>> Sessions; + + const std::string m_HostAddress; + const std::string m_ProjectStoreUrl; + const std::string m_Project; + const std::string m_Oplog; + const size_t m_MaxBlockSize; + const size_t m_MaxChunkEmbedSize; +}; + +std::unique_ptr<RemoteProjectStore> +CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("http://{}"sv, Url); + } + std::unique_ptr<RemoteProjectStore> RemoteStore = + std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); + return RemoteStore; +} + +} // namespace zen diff --git a/zenserver/projectstore/zenremoteprojectstore.h b/zenserver/projectstore/zenremoteprojectstore.h new file mode 100644 index 000000000..ef9dcad8c --- /dev/null +++ b/zenserver/projectstore/zenremoteprojectstore.h @@ -0,0 +1,18 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +struct ZenRemoteStoreOptions : RemoteStoreOptions +{ + std::string Url; + std::string ProjectId; + std::string OplogId; +}; + +std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); + +} // namespace zen diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 260b83355..dbb185bec 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -6,6 +6,7 @@ #include "diag/logging.h" #include <zencore/compactbinary.h> +#include <zencore/compositebuffer.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/string.h> @@ -437,6 +438,47 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K } CloudCacheResult +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) +{ + ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; +} + +CloudCacheResult CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("HordeClient::PutObject"); diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 88ab77247..99e5c530f 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -104,6 +104,7 @@ public: PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index cf771c6de..9eae2761d 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -262,7 +262,7 @@ public: ZEN_INFO("instantiating project service"); m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager); - m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); + m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, *m_AuthMgr}); #if ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.ComputeServiceEnabled) diff --git a/zenstore/include/zenstore/scrubcontext.h b/zenstore/include/zenstore/scrubcontext.h index bf906492c..0b884fcc6 100644 --- a/zenstore/include/zenstore/scrubcontext.h +++ b/zenstore/include/zenstore/scrubcontext.h @@ -3,6 +3,7 @@ #pragma once #include <zencore/timer.h> +#include <zenstore/hashkeyset.h> namespace zen { diff --git a/zenutil/include/zenutil/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h index e6b0a6710..3ec4b19b0 100644 --- a/zenutil/include/zenutil/zenserverprocess.h +++ b/zenutil/include/zenutil/zenserverprocess.h @@ -7,8 +7,6 @@ #include <zencore/thread.h> #include <zencore/uid.h> -#include <gsl/gsl-lite.hpp> - #include <atomic> #include <filesystem> #include <optional> |