aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-09 16:49:51 +0100
committerGitHub <[email protected]>2023-02-09 07:49:51 -0800
commit2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97 (patch)
treed631da0746b78cad7140784de4e637bcfb4e1cac
parentUpdate README.md (diff)
downloadzen-2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97.tar.xz
zen-2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97.zip
oplog upload/download (#214)
- Feature: Zen server endpoint `prj/{project}/oplog/{log}/chunks` to post multiple attachments in one request. - Feature: Zen server endpoint `prj/{project}/oplog/{log}/save` to save an oplog container. Accepts `CbObject` containing a compressed oplog and attachment references organized in blocks. - Feature: Zen server endpoint `prj/{project}/oplog/{log}/load` to request an oplog container. Responds with an `CbObject` containing a compressed oplog and attachment references organized in blocks. - Feature: Zen server endpoint `{project}/oplog/{log}/rpc` to initiate an import to or export from an external location and other operations. Use either JSon or CbPackage as payload. - CbObject/JSon RPC format for `import` and `export` methods: - CbObject RPC format for `getchunks` method, returns CbPackage with the found chunks, if all chunks are found the number of attachments matches number of chunks requested. - Feature: Zen server `{project}/oplog/{log}/{hash}` now accepts `HttpVerb::kPost` as well as `HttpVerb::kGet`. - Feature: Zen command line tool `oplog-export` to export an oplog to an external target using the zenserver oplog export endpoint. - Feature: Zen command line tool `oplog-import` to import an oplog from an external source using the zenserver oplog import endpoint.
-rw-r--r--CHANGELOG.md109
-rw-r--r--zen/cmds/projectstore.cpp509
-rw-r--r--zen/cmds/projectstore.h73
-rw-r--r--zen/zen.cpp7
-rw-r--r--zencore/include/zencore/thread.h9
-rw-r--r--zenserver-test/zenserver-test.cpp546
-rw-r--r--zenserver/auth/oidc.cpp2
-rw-r--r--zenserver/projectstore/fileremoteprojectstore.cpp235
-rw-r--r--zenserver/projectstore/fileremoteprojectstore.h19
-rw-r--r--zenserver/projectstore/jupiterremoteprojectstore.cpp244
-rw-r--r--zenserver/projectstore/jupiterremoteprojectstore.h26
-rw-r--r--zenserver/projectstore/projectstore.cpp721
-rw-r--r--zenserver/projectstore/projectstore.h63
-rw-r--r--zenserver/projectstore/remoteprojectstore.cpp1036
-rw-r--r--zenserver/projectstore/remoteprojectstore.h111
-rw-r--r--zenserver/projectstore/zenremoteprojectstore.cpp341
-rw-r--r--zenserver/projectstore/zenremoteprojectstore.h18
-rw-r--r--zenserver/upstream/jupiter.cpp42
-rw-r--r--zenserver/upstream/jupiter.h1
-rw-r--r--zenserver/zenserver.cpp2
-rw-r--r--zenstore/include/zenstore/scrubcontext.h1
-rw-r--r--zenutil/include/zenutil/zenserverprocess.h2
22 files changed, 4025 insertions, 92 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8088d56c3..7f14d73c1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,6 +12,115 @@
- `--gcpath` Absolute path to oplog lifetime marker file (optional)
- Feature: Build scripts and tooling to build zen compliant with VFX reference platform CY2022/2021 matching UE linux builds
- Feature: added `xmake sln` task which replaces `generate_projects.bat`
+- Feature: Zen server endpoint `prj/{project}/oplog/{log}/chunks` to post multiple attachments in one request.
+- Feature: Zen server endpoint `prj/{project}/oplog/{log}/save` to save an oplog container. Accepts `CbObject` containing a compressed oplog and attachment references organized in blocks.
+- Feature: Zen server endpoint `prj/{project}/oplog/{log}/load` to request an oplog container. Responds with an `CbObject` containing a compressed oplog and attachment references organized in blocks.
+- Feature: Zen server endpoint `{project}/oplog/{log}/rpc` to initiate an import to or export from an external location and other operations. Use either JSon or CbPackage as payload.
+ - CbObject/JSon RPC format for `import` and `export` methods:
+ ```json
+ {
+ "method" : "<method>",
+ "params" : {
+ "maxblocksize": "<maxblocksize>",
+ "maxchunkembedsize": "<maxchunkembedsize>",
+ "file" : {
+ "path" : "<file-system-folder-path>",
+ "name" : "<oplog-file-name>"
+ },
+ "cloud" : {
+ "url" : "<serviceurl>",
+ "namespace" : "<namespace>",
+ "bucket" : "<bucket>",
+ "key" : "<iohash>",
+ "openid-provider" : "<provider-id>",
+ "access-token" : "<access-token>",
+ "disableblocks" : "<disableblocks>",
+ "disabletempblocks" : "<disabletempblocks>"
+ },
+ "zen" : {
+ "url" : "<url>",
+ "project" : "<projectid>",
+ "oplog" : "<oplogid>"
+ }
+ }
+ }
+ ```
+ - `"method"`supported methods are `"export"` and `"import"` to import/export an oplog
+ - `"params"` container for parameters
+ - `"maxblocksize"` - Optional. The maximum size of a block of attachments, default 134217728 (128 Mb) (export only)
+ - `"maxchunkembedsize"` - Optional. The maximum size of an attachment to be put in a block, larger attachments will be stored as usual attachments, default 1048576 (1Mb) (export only)
+ - `"force"` - Optional. Boolean flag to indicate weather attachments should be uploaded/downloaded disregarding prior existance
+ - External location types are "file" (File system), "cloud" (UE Cloud Storage service) or "zen" (Zen server instance), provide one of those as remote location.
+ - `"file"` - Optional. Indicates remote location is the local file system
+ - `"path"` - File system path folder to export to / import from
+ - `"name"` - File name of oplog output, written into <file-system-folder-path>
+ - `"cloud"` - Optional. Indicates remote location is UE Cloud Storage service
+ - `"url"` - Jupiter service endpoint url
+ - `"namespace"` - Name of namespace to store data to
+ - `"bucket"` - Name of bucket to store data to
+ - `"key"` - IoHash key to the stored oplog container
+ - `"openid-provider"` - Optional. Name of openid provider used to authenticate with, requires that the zen server instance has been provided with a oids refresh token for <provider-id>
+ - `"access-token"` - Optional. JWS access token to authenticate with
+ - `"disableblocks"` - Optional. Disable creation of attachments blocks - "true"/"false" (export only)
+ - `"disabletempblocks"` - Optional. Disable creation of attachments temp blocks forcing upload before oplog container - "true"/"false" (export only)
+ - `"zen"` - Optional. Indicates remote location is a Zen server instance
+ - `"url"` - Zen server instance url
+ - `"project"` - The remote project name (id)
+ - `"oplog" - The remote oplog name (id)
+ - CbObject RPC format for `getchunks` method, returns CbPackage with the found chunks, if all chunks are found the number of attachments matches number of chunks requested.
+ ```json
+ {
+ "method" : "getchunks",
+ "chunks" : [
+ "<rawhash>",
+ ]
+ }
+ ```
+ - CbPackage RPC format for `putchunks` method, attachments are stored in CidStore
+ ```json
+ {
+ "method" : "putchunks",
+ }
+ ```
+- Feature: Zen server `{project}/oplog/{log}/{hash}` now accepts `HttpVerb::kPost` as well as `HttpVerb::kGet`.
+- Feature: Zen command line tool `oplog-export` to export an oplog to an external target using the zenserver oplog export endpoint.
+ - `--project` Project name (id)
+ - `--oplog` Project name (id)
+ - `--maxblocksize` The maximum size of a block of attachments (optional)
+ - `--maxchunkembedsize` The maximum size of an attachment to be put in a block, larger attachments will be stored as usual attachments (optional)
+ - `--force` Force upload/download of attachments even if they already exist.
+ - `--file` File system path folder to export to / import from
+ - `--name` File name of oplog output, written into `--file` path
+ - `--disableblocks` Disable block creation and save all attachments individually
+ - `--forcetempblocks` Force creation of temp attachment blocks
+ - `--cloud` Jupiter service endpoint to export to / import from
+ - `namespace` Name of namespace to store data to
+ - `bucket` Name of bucket to store data to
+ - `key` Key to the stored oplog container (If omitted a default key will be generated based on project/oplog/namespace/bucket)
+ - `openid-provider` Optional name of openid provider used to authenticate with, requires that the zen server instance has been provided with a oids refresh token for the provider name
+ - `access-token` Optional JWS access token to authenticate with
+ - `disableblocks` Disable block creation and save all attachments individually
+ - `disabletempblocks` Disable temp block creation and upload blocks without waiting for oplog container to be uploaded
+ - `--zen` Zen server instance url to export to / import from
+ - `--target-project` The remote project name (id) (optional, defaults to same as `project`)
+ - `--taret-oplog` The remote oplog name (id) (optional, defaults to same as `olplog`)
+ - `--clean` Delete and create a new oplog before starting export
+- Feature: Zen command line tool `oplog-import` to import an oplog from an external source using the zenserver oplog import endpoint.
+ - `--project` Project name (id)
+ - `--oplog` Project name (id)
+ - `--force` Force upload/download of attachments even if they already exist.
+ - `--file` File system path folder to export to / import from
+ - `--name` File name of oplog output, written into `--file` path
+ - `--cloud` Jupiter service endpoint to export to / import from
+ - `namespace` Name of namespace to store data to
+ - `bucket` Name of bucket to store data to
+ - `key` Key to the stored oplog container (If omitted a default key will be generated based on project/oplog/namespace/bucket)
+ - `openid-provider` Optional name of openid provider used to authenticate with, requires that the zen server instance has been provided with a oids refresh token for the provider name
+ - `access-token` Optional JWS access token to authenticate with
+ - `--zen` Zen server instance url to export to / import from
+ - `--source-project` The remote project name (id) (optional, defaults to same as `project`)
+ - `--source-oplog` The remote oplog name (id) (optional, defaults to same as `olplog`)
+ - `--clean` Delete and create a new oplog before starting import
- Improvement: Faster oplog replay - reduces time to open an existing oplog
- Improvement: Clearer error messages and logging when requests to project store fails
- Changed: Removed remnants of old mesh experiment
diff --git a/zen/cmds/projectstore.cpp b/zen/cmds/projectstore.cpp
index c53f0bc35..8ecdecaaa 100644
--- a/zen/cmds/projectstore.cpp
+++ b/zen/cmds/projectstore.cpp
@@ -2,28 +2,13 @@
#include "projectstore.h"
-#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compactbinaryvalue.h>
-#include <zencore/compress.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/stream.h>
-#include <zencore/uid.h>
-#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
-#include <zenhttp/httpshared.h>
-#include <zenutil/basicfile.h>
-#include <zenutil/zenserverprocess.h>
-
-#include <memory>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
-#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
///////////////////////////////////////
@@ -32,9 +17,9 @@ DropProjectCommand::DropProjectCommand()
{
m_Options.add_options()("h,help", "Print help");
m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
- m_Options.add_option("", "p", "project", "Namnspace name", cxxopts::value(m_ProjectName), "<projectname>");
- m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogname>");
- m_Options.parse_positional({"{project}", "{oplog}"});
+ m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>");
+ m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>");
+ m_Options.parse_positional({"project", "oplog"});
}
DropProjectCommand::~DropProjectCommand() = default;
@@ -92,9 +77,9 @@ ProjectInfoCommand::ProjectInfoCommand()
{
m_Options.add_options()("h,help", "Print help");
m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
- m_Options.add_option("", "p", "project", "Namnspace name", cxxopts::value(m_ProjectName), "<projectname>");
- m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogname>");
- m_Options.parse_positional({"{project}", "{oplog}"});
+ m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>");
+ m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>");
+ m_Options.parse_positional({"project", "oplog"});
}
ProjectInfoCommand::~ProjectInfoCommand() = default;
@@ -279,3 +264,485 @@ CreateOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
return GetReturnCode(Response);
}
+
+///////////////////////////////////////
+
+ExportOplogCommand::ExportOplogCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+ m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>");
+ m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>");
+ m_Options.add_option("", "", "maxblocksize", "Max size for bundled attachments", cxxopts::value(m_MaxBlockSize), "<blocksize>");
+ m_Options.add_option("",
+ "",
+ "maxchunkembedsize",
+ "Max size for attachment to be bundled",
+ cxxopts::value(m_MaxChunkEmbedSize),
+ "<chunksize>");
+ m_Options.add_option("", "f", "force", "Force export of all attachments", cxxopts::value(m_Force), "<force>");
+ m_Options.add_option("",
+ "",
+ "disableblocks",
+ "Disable block creation and save all attachments individually (applies to file and cloud target)",
+ cxxopts::value(m_DisableBlocks),
+ "<disable>");
+
+ m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
+ m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
+ m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>");
+ m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>");
+ m_Options
+ .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>");
+ m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>");
+ m_Options.add_option("cloud",
+ "",
+ "disabletempblocks",
+ "Disable temp block creation and upload blocks without waiting for oplog container to be uploaded",
+ cxxopts::value(m_CloudDisableTempBlocks),
+ "<disable>");
+
+ m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>");
+ m_Options.add_option("zen", "", "target-project", "Zen target project name", cxxopts::value(m_ZenProjectName), "<targetprojectid>");
+ m_Options.add_option("zen", "", "target-oplog", "Zen target oplog name", cxxopts::value(m_ZenOplogName), "<targetoplogid>");
+ m_Options.add_option("zen", "", "clean", "Delete existing target Zen oplog", cxxopts::value(m_ZenClean), "<clean>");
+
+ m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>");
+ m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>");
+ m_Options.add_option("file",
+ "",
+ "forcetempblocks",
+ "Force creation of temp attachment blocks",
+ cxxopts::value(m_FileForceEnableTempBlocks),
+ "<forcetempblocks>");
+
+ m_Options.parse_positional({"project", "oplog"});
+}
+
+ExportOplogCommand::~ExportOplogCommand() = default;
+
+int
+ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ using namespace std::literals;
+
+ ZEN_UNUSED(GlobalOptions);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ if (m_ProjectName.empty())
+ {
+ ZEN_ERROR("Project name must be given");
+ return 1;
+ }
+
+ if (m_OplogName.empty())
+ {
+ ZEN_ERROR("Oplog name must be given");
+ return 1;
+ }
+
+ size_t TargetCount = 0;
+ TargetCount += m_CloudUrl.empty() ? 0 : 1;
+ TargetCount += m_ZenUrl.empty() ? 0 : 1;
+ TargetCount += m_FileDirectoryPath.empty() ? 0 : 1;
+ if (TargetCount != 1)
+ {
+ ZEN_ERROR("Provide one target only");
+ ZEN_CONSOLE("{}", m_Options.help({""}).c_str());
+ return 1;
+ }
+
+ cpr::Session Session;
+
+ if (!m_CloudUrl.empty())
+ {
+ if (m_CloudNamespace.empty() || m_CloudBucket.empty())
+ {
+ ZEN_ERROR("Options for cloud target are missing");
+ ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str());
+ return 1;
+ }
+ if (m_CloudKey.empty())
+ {
+ std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket);
+ zen::IoHash Key = zen::IoHash::HashBuffer(KeyString.data(), KeyString.size());
+ m_CloudKey = Key.ToHexString();
+ ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey);
+ }
+ }
+
+ if (!m_ZenUrl.empty())
+ {
+ if (m_ZenProjectName.empty())
+ {
+ m_ZenProjectName = m_ProjectName;
+ ZEN_WARN("Using default zen target project id '{}'", m_ZenProjectName);
+ }
+ if (m_ZenOplogName.empty())
+ {
+ m_ZenOplogName = m_OplogName;
+ ZEN_WARN("Using default zen target oplog id '{}'", m_ZenOplogName);
+ }
+
+ std::string TargetUrlBase = fmt::format("{}/prj", m_ZenUrl);
+ if (TargetUrlBase.find("://") == std::string::npos)
+ {
+ // Assume https URL
+ TargetUrlBase = fmt::format("http://{}", TargetUrlBase);
+ }
+
+ Session.SetUrl({fmt::format("{}/{}/oplog/{}", TargetUrlBase, m_ZenProjectName, m_ZenOplogName)});
+ cpr::Response Response = Session.Get();
+ if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
+ {
+ ZEN_WARN("Automatically creating oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName)
+ Response = Session.Post();
+ if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+ }
+ else if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+ else if (m_ZenClean)
+ {
+ ZEN_WARN("Cleaning oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName)
+ Response = Session.Delete();
+ if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+ Response = Session.Post();
+ if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+ }
+ }
+
+ if (!m_FileDirectoryPath.empty())
+ {
+ if (m_FileName.empty())
+ {
+ m_FileName = m_OplogName;
+ ZEN_WARN("Using default file name '{}'", m_FileName);
+ }
+ }
+
+ const std::string SourceUrlBase = fmt::format("{}/prj", m_HostName);
+ std::string TargetDescription;
+ Session.SetUrl({fmt::format("{}/{}/oplog/{}/rpc", SourceUrlBase, m_ProjectName, m_OplogName)});
+ Session.SetHeader({{"Content-Type", std::string(zen::MapContentTypeToString(zen::HttpContentType::kCbObject))}});
+ zen::CbObjectWriter Writer;
+ Writer.AddString("method"sv, "export"sv);
+ Writer.BeginObject("params"sv);
+ {
+ if (m_MaxBlockSize != 0)
+ {
+ Writer.AddInteger("maxblocksize"sv, m_MaxBlockSize);
+ }
+ if (m_MaxChunkEmbedSize != 0)
+ {
+ Writer.AddInteger("maxchunkembedsize"sv, m_MaxChunkEmbedSize);
+ }
+ if (m_Force)
+ {
+ Writer.AddBool("force"sv, true);
+ }
+ if (!m_FileDirectoryPath.empty())
+ {
+ Writer.BeginObject("file"sv);
+ {
+ Writer.AddString("file"sv, m_FileDirectoryPath);
+ Writer.AddString("name"sv, m_FileName);
+ if (m_DisableBlocks)
+ {
+ Writer.AddBool("disableblocks"sv, true);
+ }
+ if (m_FileForceEnableTempBlocks)
+ {
+ Writer.AddBool("enabletempblocks"sv, true);
+ }
+ }
+ Writer.EndObject(); // "file"
+ TargetDescription = fmt::format("[file] '{}/{}'", m_FileDirectoryPath, m_FileName);
+ }
+ if (!m_CloudUrl.empty())
+ {
+ Writer.BeginObject("cloud"sv);
+ {
+ Writer.AddString("url"sv, m_CloudUrl);
+ Writer.AddString("namespace"sv, m_CloudNamespace);
+ Writer.AddString("bucket"sv, m_CloudBucket);
+ Writer.AddString("key"sv, m_CloudKey);
+ if (!m_CloudOpenIdProvider.empty())
+ {
+ Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider);
+ }
+ if (!m_CloudAccessToken.empty())
+ {
+ Writer.AddString("access-token"sv, m_CloudAccessToken);
+ }
+ if (m_DisableBlocks)
+ {
+ Writer.AddBool("disableblocks"sv, true);
+ }
+ if (m_CloudDisableTempBlocks)
+ {
+ Writer.AddBool("disabletempblocks"sv, true);
+ }
+ }
+ Writer.EndObject(); // "cloud"
+ TargetDescription = fmt::format("[cloud] '{}/{}/{}/{}'", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey);
+ }
+ if (!m_ZenUrl.empty())
+ {
+ Writer.BeginObject("zen"sv);
+ {
+ Writer.AddString("url"sv, m_ZenUrl);
+ Writer.AddString("project"sv, m_ZenProjectName);
+ Writer.AddString("oplog"sv, m_ZenOplogName);
+ }
+ Writer.EndObject(); // "zen"
+
+ TargetDescription = fmt::format("[zen] '{}/{}/{}'", m_ZenUrl, m_ZenProjectName, m_ZenOplogName);
+ }
+ }
+ Writer.EndObject(); // "params"
+
+ zen::BinaryWriter MemOut;
+ Writer.Save(MemOut);
+ Session.SetBody(cpr::Body{(const char*)MemOut.GetData(), MemOut.GetSize()});
+
+ ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription);
+ cpr::Response Response = Session.Post();
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+}
+
+////////////////////////////
+
+ImportOplogCommand::ImportOplogCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+ m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>");
+ m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>");
+ m_Options.add_option("", "", "maxblocksize", "Max size for bundled attachments", cxxopts::value(m_MaxBlockSize), "<blocksize>");
+ m_Options.add_option("",
+ "",
+ "maxchunkembedsize",
+ "Max size for attachment to be bundled",
+ cxxopts::value(m_MaxChunkEmbedSize),
+ "<chunksize>");
+ m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>");
+
+ m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
+ m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
+ m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>");
+ m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>");
+ m_Options
+ .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>");
+ m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>");
+
+ m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>");
+ m_Options.add_option("zen", "", "source-project", "Zen source project name", cxxopts::value(m_ZenProjectName), "<sourceprojectid>");
+ m_Options.add_option("zen", "", "source-oplog", "Zen source oplog name", cxxopts::value(m_ZenOplogName), "<sourceoplogid>");
+ m_Options.add_option("zen", "", "clean", "Delete existing target Zen oplog", cxxopts::value(m_ZenClean), "<clean>");
+
+ m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>");
+ m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>");
+
+ m_Options.parse_positional({"project", "oplog"});
+}
+
+ImportOplogCommand::~ImportOplogCommand() = default;
+
+int
+ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ using namespace std::literals;
+
+ ZEN_UNUSED(GlobalOptions);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ if (m_ProjectName.empty())
+ {
+ ZEN_ERROR("Project name must be given");
+ return 1;
+ }
+
+ if (m_OplogName.empty())
+ {
+ ZEN_ERROR("Oplog name must be given");
+ return 1;
+ }
+
+ size_t TargetCount = 0;
+ TargetCount += m_CloudUrl.empty() ? 0 : 1;
+ TargetCount += m_ZenUrl.empty() ? 0 : 1;
+ TargetCount += m_FileDirectoryPath.empty() ? 0 : 1;
+ if (TargetCount != 1)
+ {
+ ZEN_ERROR("Provide one source only");
+ ZEN_CONSOLE("{}", m_Options.help({""}).c_str());
+ return 1;
+ }
+
+ cpr::Session Session;
+
+ if (!m_CloudUrl.empty())
+ {
+ if (m_CloudNamespace.empty() || m_CloudBucket.empty())
+ {
+ ZEN_ERROR("Options for cloud source are missing");
+ ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str());
+ return 1;
+ }
+ if (m_CloudKey.empty())
+ {
+ std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket);
+ zen::IoHash Key = zen::IoHash::HashBuffer(KeyString.data(), KeyString.size());
+ m_CloudKey = Key.ToHexString();
+ ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey);
+ }
+ }
+
+ if (!m_ZenUrl.empty())
+ {
+ if (m_ZenProjectName.empty())
+ {
+ m_ZenProjectName = m_ProjectName;
+ ZEN_WARN("Using default zen target project id '{}'", m_ZenProjectName);
+ }
+ if (m_ZenOplogName.empty())
+ {
+ m_ZenOplogName = m_OplogName;
+ ZEN_WARN("Using default zen target oplog id '{}'", m_ZenOplogName);
+ }
+ }
+
+ if (!m_FileDirectoryPath.empty())
+ {
+ if (m_FileName.empty())
+ {
+ m_FileName = m_OplogName;
+ ZEN_WARN("Using auto generated file name '{}'", m_FileName);
+ }
+ }
+
+ const std::string TargetUrlBase = fmt::format("{}/prj", m_HostName);
+ Session.SetUrl({fmt::format("{}/{}/oplog/{}", TargetUrlBase, m_ProjectName, m_OplogName)});
+ cpr::Response Response = Session.Get();
+ if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
+ {
+ ZEN_WARN("Automatically creating oplog '{}/{}'", m_ProjectName, m_OplogName)
+ Response = Session.Post();
+ if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+ }
+ else if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+ else if (m_ZenClean)
+ {
+ ZEN_WARN("Cleaning oplog '{}/{}'", m_ProjectName, m_OplogName)
+ Response = Session.Delete();
+ if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+ Response = Session.Post();
+ if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+ }
+
+ std::string SourceDescription;
+ Session.SetUrl(fmt::format("{}/{}/oplog/{}/rpc", TargetUrlBase, m_ProjectName, m_OplogName));
+ Session.SetHeader({{"Content-Type", std::string(zen::MapContentTypeToString(zen::HttpContentType::kCbObject))}});
+
+ zen::CbObjectWriter Writer;
+ Writer.AddString("method"sv, "import"sv);
+ Writer.BeginObject("params"sv);
+ {
+ if (m_Force)
+ {
+ Writer.AddBool("force"sv, true);
+ }
+ if (!m_FileDirectoryPath.empty())
+ {
+ Writer.BeginObject("file"sv);
+ {
+ Writer.AddString("file"sv, m_FileDirectoryPath);
+ Writer.AddString("name"sv, m_FileName);
+ }
+ Writer.EndObject(); // "file"
+ SourceDescription = fmt::format("[file] '{}/{}'", m_FileDirectoryPath, m_FileName);
+ }
+ if (!m_CloudUrl.empty())
+ {
+ Writer.BeginObject("cloud"sv);
+ {
+ Writer.AddString("url"sv, m_CloudUrl);
+ Writer.AddString("namespace"sv, m_CloudNamespace);
+ Writer.AddString("bucket"sv, m_CloudBucket);
+ Writer.AddString("key"sv, m_CloudKey);
+ if (!m_CloudOpenIdProvider.empty())
+ {
+ Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider);
+ }
+ if (!m_CloudAccessToken.empty())
+ {
+ Writer.AddString("access-token"sv, m_CloudAccessToken);
+ }
+ }
+ Writer.EndObject(); // "cloud"
+ SourceDescription = fmt::format("[cloud] '{}/{}/{}/{}'", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey);
+ }
+ if (!m_ZenUrl.empty())
+ {
+ Writer.BeginObject("zen"sv);
+ {
+ Writer.AddString("url"sv, m_ZenUrl);
+ Writer.AddString("project"sv, m_ZenProjectName);
+ Writer.AddString("oplog"sv, m_ZenOplogName);
+ }
+ Writer.EndObject(); // "zen"
+ SourceDescription = fmt::format("[zen] '{}'", m_ZenUrl);
+ }
+ }
+ Writer.EndObject(); // "params"
+
+ zen::BinaryWriter MemOut;
+ Writer.Save(MemOut);
+ Session.SetBody(cpr::Body{(const char*)MemOut.GetData(), MemOut.GetSize()});
+
+ ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName);
+ Response = Session.Post();
+
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+}
diff --git a/zen/cmds/projectstore.h b/zen/cmds/projectstore.h
index 73cba8f66..af4da548f 100644
--- a/zen/cmds/projectstore.h
+++ b/zen/cmds/projectstore.h
@@ -70,3 +70,76 @@ private:
std::string m_OplogId;
std::string m_GcPath;
};
+
+class ExportOplogCommand : public ZenCmdBase
+{
+public:
+ ExportOplogCommand();
+ ~ExportOplogCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"oplog-export",
+ "Export project store oplog to cloud (--cloud), file system (--file) or other Zen instance (--zen)"};
+ std::string m_HostName;
+ std::string m_ProjectName;
+ std::string m_OplogName;
+ uint64_t m_MaxBlockSize = 0;
+ uint64_t m_MaxChunkEmbedSize = 0;
+ bool m_Force = false;
+ bool m_DisableBlocks = false;
+
+ std::string m_CloudUrl;
+ std::string m_CloudNamespace;
+ std::string m_CloudBucket;
+ std::string m_CloudKey;
+ std::string m_CloudOpenIdProvider;
+ std::string m_CloudAccessToken;
+ bool m_CloudDisableTempBlocks = false;
+
+ std::string m_ZenUrl;
+ std::string m_ZenProjectName;
+ std::string m_ZenOplogName;
+ bool m_ZenClean;
+
+ std::string m_FileDirectoryPath;
+ std::string m_FileName;
+ bool m_FileForceEnableTempBlocks = false;
+};
+
+class ImportOplogCommand : public ZenCmdBase
+{
+public:
+ ImportOplogCommand();
+ ~ImportOplogCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"oplog-import",
+ "Import project store oplog from cloud (--cloud), file system (--file) or other Zen instance (--zen)"};
+ std::string m_HostName;
+ std::string m_ProjectName;
+ std::string m_OplogName;
+ size_t m_MaxBlockSize = 0;
+ size_t m_MaxChunkEmbedSize = 0;
+ bool m_Force = false;
+
+ std::string m_CloudUrl;
+ std::string m_CloudNamespace;
+ std::string m_CloudBucket;
+ std::string m_CloudKey;
+ std::string m_CloudOpenIdProvider;
+ std::string m_CloudAccessToken;
+
+ std::string m_ZenUrl;
+ std::string m_ZenProjectName;
+ std::string m_ZenOplogName;
+ bool m_ZenClean;
+
+ std::string m_FileDirectoryPath;
+ std::string m_FileName;
+};
diff --git a/zen/zen.cpp b/zen/zen.cpp
index 2b6a529fe..9d85680d1 100644
--- a/zen/zen.cpp
+++ b/zen/zen.cpp
@@ -22,7 +22,6 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
-#include <zencore/zencore.h>
#include <zenhttp/httpcommon.h>
@@ -69,7 +68,7 @@ ZenCmdBase::ParseOptions(int argc, char** argv)
cxxopts::ParseResult Result = CmdOptions.parse(argc, argv);
if (Result.count("help"))
{
- printf("%s\n", CmdOptions.help({}).c_str());
+ printf("%s\n", CmdOptions.help().c_str());
return false;
}
if (!Result.unmatched().empty())
@@ -215,6 +214,8 @@ main(int argc, char** argv)
PsCommand PsCmd;
UpCommand UpCmd;
DownCommand DownCmd;
+ ExportOplogCommand ExportOplogCmd;
+ ImportOplogCommand ImportOplogCmd;
VersionCommand VersionCmd;
CacheInfoCommand CacheInfoCmd;
DropProjectCommand ProjectDropCmd;
@@ -253,6 +254,8 @@ main(int argc, char** argv)
{"project-info", &ProjectInfoCmd, "Info on project or project oplog"},
{"project-create", &CreateProjectCmd, "Create a project"},
{"oplog-create", &CreateOplogCmd, "Create a project oplog"},
+ {"oplog-export", &ExportOplogCmd, "Export project store oplog"},
+ {"oplog-import", &ImportOplogCmd, "Import project store oplog"},
{"gc", &GcCmd, "Garbage collect zen storage"},
{"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"},
#if ZEN_WITH_TESTS
diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h
index 2aad22061..a9c96d422 100644
--- a/zencore/include/zencore/thread.h
+++ b/zencore/include/zencore/thread.h
@@ -163,6 +163,15 @@ public:
std::ptrdiff_t Remaining() const { return Counter.load(); }
+ // If you want to add dynamic count, make sure to set the initial counter to 1
+ // and then do a CountDown() just before wait to not trigger the event causing
+ // false positive completion results.
+ void AddCount(std::ptrdiff_t Count)
+ {
+ std::atomic_ptrdiff_t Old = Counter.fetch_add(Count);
+ ZEN_ASSERT_SLOW(Old > 0);
+ }
+
bool Wait(int TimeoutMs = -1)
{
std::ptrdiff_t Old = Counter.load();
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 5c7e150ce..d8b135f74 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -15,8 +15,10 @@
#include <zencore/refcount.h>
#include <zencore/stream.h>
#include <zencore/string.h>
+#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
+#include <zencore/xxhash.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/httpshared.h>
#include <zenhttp/websocket.h>
@@ -49,6 +51,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <random>
#include <span>
#include <thread>
+#include <typeindex>
#include <unordered_map>
#if ZEN_PLATFORM_WINDOWS
@@ -2323,12 +2326,12 @@ public:
ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {}
~ZenServerTestHelper() {}
- void SpawnServers()
+ void SpawnServers(std::string_view AdditionalServerArgs = std::string_view())
{
- SpawnServers([](ZenServerInstance&) {});
+ SpawnServers([](ZenServerInstance&) {}, AdditionalServerArgs);
}
- void SpawnServers(auto&& Callback)
+ void SpawnServers(auto&& Callback, std::string_view AdditionalServerArgs)
{
ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount);
@@ -2343,7 +2346,7 @@ public:
Callback(*Instance);
- Instance->SpawnServer(13337 + i);
+ Instance->SpawnServer(13337 + i, AdditionalServerArgs);
}
for (int i = 0; i < m_ServerCount; ++i)
@@ -2530,6 +2533,541 @@ TEST_CASE("websocket.basic")
IoDispatcher.Stop();
}
+std::string
+OidAsString(const Oid& Id)
+{
+ StringBuilder<25> OidStringBuilder;
+ Id.ToString(OidStringBuilder);
+ return OidStringBuilder.ToString();
+}
+
+CbPackage
+CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer> >& Attachments)
+{
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ Package.AddAttachment(Attach);
+ ZEN_DEBUG("Added attachment {}", Attach.GetHash());
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+};
+
+std::vector<std::pair<Oid, CompressedBuffer> >
+CreateAttachments(const std::span<const size_t>& Sizes)
+{
+ std::vector<std::pair<Oid, CompressedBuffer> > Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ std::vector<uint8_t> Data;
+ Data.resize(Size);
+ uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
+ for (size_t Idx = 0; Idx < Size / 2; ++Idx)
+ {
+ DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu);
+ }
+ if (Size & 1)
+ {
+ Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff);
+ }
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
+ }
+ return Result;
+}
+
+cpr::Body
+AsBody(const IoBuffer& Payload)
+{
+ return cpr::Body{(const char*)Payload.GetData(), Payload.Size()};
+};
+
+enum CbWriterMeta
+{
+ BeginObject,
+ EndObject,
+ BeginArray,
+ EndArray
+};
+
+inline CbWriter&
+operator<<(CbWriter& Writer, CbWriterMeta Meta)
+{
+ switch (Meta)
+ {
+ case BeginObject:
+ Writer.BeginObject();
+ break;
+ case EndObject:
+ Writer.EndObject();
+ break;
+ case BeginArray:
+ Writer.BeginArray();
+ break;
+ case EndArray:
+ Writer.EndArray();
+ break;
+ default:
+ ZEN_ASSERT(false);
+ }
+ return Writer;
+}
+
+TEST_CASE("project.remote")
+{
+ using namespace std::literals;
+
+ ZenServerTestHelper Servers("remote", 3);
+ Servers.SpawnServers("--debug");
+
+ std::vector<Oid> OpIds;
+ OpIds.reserve(24);
+ for (size_t I = 0; I < 24; ++I)
+ {
+ OpIds.emplace_back(Oid::NewOid());
+ }
+
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer> >, Oid::Hasher> Attachments;
+ {
+ std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269,
+ 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759,
+ 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045,
+ 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
+ auto It = AttachmentSizes.begin();
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[4]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[5]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[6]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[7]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[8]] = CreateAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[9]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[10]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[11]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[12]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
+ Attachments[OpIds[13]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[14]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[15]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[16]] = CreateAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[17]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[18]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++});
+ Attachments[OpIds[19]] = CreateAttachments(std::initializer_list<size_t>{});
+ Attachments[OpIds[20]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[21]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ Attachments[OpIds[22]] = CreateAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
+ Attachments[OpIds[23]] = CreateAttachments(std::initializer_list<size_t>{*It++});
+ ZEN_ASSERT(It == AttachmentSizes.end());
+ }
+
+ auto AddOp = [](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) {
+ XXH3_128Stream KeyHasher;
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash = KeyHasher.GetHash();
+ Oid Id;
+ memcpy(Id.OidBits, &KeyHash, sizeof Id.OidBits);
+ IoBuffer Buffer = Op.GetBuffer().AsIoBuffer();
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF);
+ Ops.insert({Id, OpCoreHash});
+ };
+
+ auto MakeProject = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName) {
+ CbObjectWriter Project;
+ Project.AddString("id"sv, ProjectName);
+ Project.AddString("root"sv, ""sv);
+ Project.AddString("engine"sv, ""sv);
+ Project.AddString("project"sv, ""sv);
+ Project.AddString("projectfile"sv, ""sv);
+ IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer();
+ std::string ProjectRequest = fmt::format("{}/prj/{}", UrlBase, ProjectName);
+ Session.SetUrl({ProjectRequest});
+ Session.SetBody(cpr::Body{(const char*)ProjectPayload.GetData(), ProjectPayload.GetSize()});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ };
+
+ auto MakeOplog = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) {
+ std::string CreateOplogRequest = fmt::format("{}/prj/{}/oplog/{}", UrlBase, ProjectName, OplogName);
+ Session.SetUrl({CreateOplogRequest});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ };
+
+ auto MakeOp = [](cpr::Session& Session,
+ std::string_view UrlBase,
+ std::string_view ProjectName,
+ std::string_view OplogName,
+ const CbPackage& OpPackage) {
+ std::string CreateOpRequest = fmt::format("{}/prj/{}/oplog/{}/new", UrlBase, ProjectName, OplogName);
+ Session.SetUrl({CreateOpRequest});
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
+ Session.SetBody(cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ };
+
+ cpr::Session Session;
+ MakeProject(Session, Servers.GetInstance(0).GetBaseUri(), "proj0");
+ MakeOplog(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
+ for (const Oid& OpId : OpIds)
+ {
+ CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]);
+ CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size());
+ AddOp(OpPackage.GetObject(), SourceOps);
+ MakeOp(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage);
+ }
+
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(Attachments.size());
+ for (const auto& AttachmentOplog : Attachments)
+ {
+ for (const auto& Attachment : AttachmentOplog.second)
+ {
+ AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash());
+ }
+ }
+
+ auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer {
+ CbObjectWriter Writer;
+ Write(Writer);
+ IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer();
+ Result.MakeOwned();
+ return Result;
+ };
+
+ auto ValidateAttachments = [&MakeCbObjectPayload, &AttachmentHashes, &Servers, &Session](int ServerIndex,
+ std::string_view Project,
+ std::string_view Oplog) {
+ std::string GetChunksRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog);
+ Session.SetUrl({GetChunksRequest});
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "getchunks"sv;
+ Writer << "chunks"sv << BeginArray;
+ for (const IoHash& Chunk : AttachmentHashes)
+ {
+ Writer << Chunk;
+ }
+ Writer << EndArray; // chunks
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
+ CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size());
+ };
+
+ auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) {
+ std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps;
+ std::vector<CbObject> ResultingOplog;
+
+ std::string GetOpsRequest =
+ fmt::format("{}/prj/{}/oplog/{}/entries", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog);
+ Session.SetUrl({GetOpsRequest});
+ cpr::Response Response = Session.Get();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+
+ IoBuffer Payload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ CbObject OplogResonse = LoadCompactBinaryObject(Payload);
+ CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView();
+
+ for (CbFieldView OpEntry : EntriesArray)
+ {
+ CbObjectView Core = OpEntry.AsObjectView();
+ BinaryWriter Writer;
+ Core.CopyTo(Writer);
+ MemoryView OpView = Writer.GetView();
+ IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
+ CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
+ AddOp(Op, TargetOps);
+ }
+ CHECK(SourceOps == TargetOps);
+ };
+
+ SUBCASE("File")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+ Session.SetUrl({SaveOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << path;
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ {
+ MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ std::string LoadOplogRequest =
+ fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ Session.SetUrl({LoadOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << path;
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("File disable blocks")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+ Session.SetUrl({SaveOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ Writer << "disableblocks"sv << true;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ {
+ MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ std::string LoadOplogRequest =
+ fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ Session.SetUrl({LoadOplogRequest});
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("File force temp blocks")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
+ Session.SetUrl({SaveOplogRequest});
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ Writer << "enabletempblocks"sv << true;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ {
+ MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
+ MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ std::string LoadOplogRequest =
+ fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ Session.SetUrl({LoadOplogRequest});
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "file"sv << BeginObject;
+ {
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ }
+
+ SUBCASE("Zen")
+ {
+ ScopedTemporaryDirectory TempDir;
+ {
+ std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
+ std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
+ MakeProject(Session, ExportTargetUri, "proj0_copy");
+ MakeOplog(Session, ExportTargetUri, "proj0_copy", "oplog0_copy");
+
+ std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ExportSourceUri, "proj0", "oplog0");
+ Session.SetUrl({SaveOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "export"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "maxblocksize"sv << 3072u;
+ Writer << "maxchunkembedsize"sv << 1296u;
+ Writer << "force"sv << false;
+ Writer << "zen"sv << BeginObject;
+ {
+ Writer << "url"sv << ExportTargetUri.substr(7);
+ Writer << "project"
+ << "proj0_copy";
+ Writer << "oplog"
+ << "oplog0_copy";
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(1, "proj0_copy", "oplog0_copy");
+ ValidateOplog(1, "proj0_copy", "oplog0_copy");
+
+ {
+ std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
+ std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
+ MakeProject(Session, ImportTargetUri, "proj1");
+ MakeOplog(Session, ImportTargetUri, "proj1", "oplog1");
+ std::string LoadOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ImportTargetUri, "proj1", "oplog1");
+ Session.SetUrl({LoadOplogRequest});
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer << "method"sv
+ << "import"sv;
+ Writer << "params" << BeginObject;
+ {
+ Writer << "force"sv << false;
+ Writer << "zen"sv << BeginObject;
+ {
+ Writer << "url"sv << ImportSourceUri.substr(7);
+ Writer << "project"
+ << "proj0_copy";
+ Writer << "oplog"
+ << "oplog0_copy";
+ }
+ Writer << EndObject; // "file"
+ }
+ Writer << EndObject; // "params"
+ });
+ Session.SetBody(AsBody(Payload));
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
+ cpr::Response Response = Session.Post();
+ CHECK(IsHttpSuccessCode(Response.status_code));
+ }
+ ValidateAttachments(2, "proj1", "oplog1");
+ ValidateOplog(2, "proj1", "oplog1");
+ }
+}
+
# if 0
TEST_CASE("lifetime.owner")
{
diff --git a/zenserver/auth/oidc.cpp b/zenserver/auth/oidc.cpp
index 17b5bac08..d2265c22f 100644
--- a/zenserver/auth/oidc.cpp
+++ b/zenserver/auth/oidc.cpp
@@ -104,7 +104,7 @@ OidcClient::RefreshToken(std::string_view RefreshToken)
if (Response.status_code != 200)
{
- return {.Reason = std::move(Response.reason)};
+ return {.Reason = fmt::format("{} ({})", Response.reason, Response.text)};
}
std::string JsonError;
diff --git a/zenserver/projectstore/fileremoteprojectstore.cpp b/zenserver/projectstore/fileremoteprojectstore.cpp
new file mode 100644
index 000000000..d7a34a6c2
--- /dev/null
+++ b/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -0,0 +1,235 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "fileremoteprojectstore.h"
+
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/timer.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+class LocalExportProjectStore : public RemoteProjectStore
+{
+public:
+ LocalExportProjectStore(std::string_view Name,
+ const std::filesystem::path& FolderPath,
+ bool ForceDisableBlocks,
+ bool ForceEnableTempBlocks)
+ : m_Name(Name)
+ , m_OutputPath(FolderPath)
+ {
+ if (ForceDisableBlocks)
+ {
+ m_EnableBlocks = false;
+ }
+ if (ForceEnableTempBlocks)
+ {
+ m_UseTempBlocks = true;
+ }
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = m_EnableBlocks,
+ .UseTempBlockFiles = m_UseTempBlocks,
+ .Description = fmt::format("[file] {}"sv, m_OutputPath)};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ Stopwatch Timer;
+ SaveResult Result;
+
+ {
+ CbObject ContainerObject = LoadCompactBinaryObject(Payload);
+
+ ContainerObject.IterateAttachments([&](CbFieldView FieldView) {
+ IoHash AttachmentHash = FieldView.AsBinaryAttachment();
+ std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash);
+ if (!std::filesystem::exists(AttachmentPath))
+ {
+ Result.Needs.insert(AttachmentHash);
+ }
+ });
+ }
+
+ std::filesystem::path ContainerPath = m_OutputPath;
+ ContainerPath.append(m_Name);
+
+ CreateDirectories(m_OutputPath);
+ BasicFile ContainerFile;
+ ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ ContainerFile.WriteAll(Payload, Ec);
+ if (Ec)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = Ec.message();
+ }
+ Result.RawHash = IoHash::HashBuffer(Payload);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+ SaveAttachmentResult Result;
+ std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
+ if (!std::filesystem::exists(ChunkPath))
+ {
+ try
+ {
+ CreateDirectories(ChunkPath.parent_path());
+
+ BasicFile ChunkFile;
+ ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate);
+ size_t Offset = 0;
+ for (const SharedBuffer& Segment : Payload.GetSegments())
+ {
+ ChunkFile.Write(Segment.GetView(), Offset);
+ Offset += Segment.GetSize();
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = Ex.what();
+ }
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ Stopwatch Timer;
+
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
+ SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash());
+ if (ChunkResult.ErrorCode)
+ {
+ ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return SaveAttachmentsResult{ChunkResult};
+ }
+ }
+ SaveAttachmentsResult Result;
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual Result FinalizeContainer(const IoHash&) override { return {}; }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ Stopwatch Timer;
+ LoadContainerResult Result;
+ std::filesystem::path ContainerPath = m_OutputPath;
+ ContainerPath.append(m_Name);
+ if (!std::filesystem::is_regular_file(ContainerPath))
+ {
+ Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
+ Result.Reason = fmt::format("The file {} does not exist"sv, ContainerPath.string());
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ IoBuffer ContainerPayload;
+ {
+ BasicFile ContainerFile;
+ ContainerFile.Open(ContainerPath, BasicFile::Mode::kRead);
+ ContainerPayload = ContainerFile.ReadAll();
+ }
+ Result.ContainerObject = LoadCompactBinaryObject(ContainerPayload);
+ if (!Result.ContainerObject)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The file {} is not formatted as a compact binary object"sv, ContainerPath.string());
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+ LoadAttachmentResult Result;
+ std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
+ if (!std::filesystem::is_regular_file(ChunkPath))
+ {
+ Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
+ Result.Reason = fmt::format("The file {} does not exist"sv, ChunkPath.string());
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ {
+ BasicFile ChunkFile;
+ ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
+ Result.Bytes = ChunkFile.ReadAll();
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ Stopwatch Timer;
+ LoadAttachmentsResult Result;
+ for (const IoHash& Hash : RawHashes)
+ {
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ if (ChunkResult.ErrorCode)
+ {
+ ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return LoadAttachmentsResult{ChunkResult};
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
+ }
+ return Result;
+ }
+
+private:
+ std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const
+ {
+ ExtendablePathBuilder<128> ShardedPath;
+ ShardedPath.Append(m_OutputPath.c_str());
+ ExtendableStringBuilder<64> HashString;
+ RawHash.ToHexString(HashString);
+ const char* str = HashString.c_str();
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str, str + 3);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 3, str + 5);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 5, str + 40);
+
+ return ShardedPath.ToPath();
+ }
+
+ const std::string m_Name;
+ const std::filesystem::path m_OutputPath;
+ bool m_EnableBlocks = true;
+ bool m_UseTempBlocks = false;
+};
+
+std::unique_ptr<RemoteProjectStore>
+CreateFileRemoteStore(const FileRemoteStoreOptions& Options)
+{
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name,
+ std::filesystem::path(Options.FolderPath),
+ Options.ForceDisableBlocks,
+ Options.ForceEnableTempBlocks);
+ return RemoteStore;
+}
+
+} // namespace zen
diff --git a/zenserver/projectstore/fileremoteprojectstore.h b/zenserver/projectstore/fileremoteprojectstore.h
new file mode 100644
index 000000000..68d1eb71e
--- /dev/null
+++ b/zenserver/projectstore/fileremoteprojectstore.h
@@ -0,0 +1,19 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "remoteprojectstore.h"
+
+namespace zen {
+
+struct FileRemoteStoreOptions : RemoteStoreOptions
+{
+ std::filesystem::path FolderPath;
+ std::string Name;
+ bool ForceDisableBlocks;
+ bool ForceEnableTempBlocks;
+};
+
+std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options);
+
+} // namespace zen
diff --git a/zenserver/projectstore/jupiterremoteprojectstore.cpp b/zenserver/projectstore/jupiterremoteprojectstore.cpp
new file mode 100644
index 000000000..66cf3c4f8
--- /dev/null
+++ b/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -0,0 +1,244 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "jupiterremoteprojectstore.h"
+
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+
+#include <auth/authmgr.h>
+#include <upstream/jupiter.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class JupiterRemoteStore : public RemoteProjectStore
+{
+public:
+ JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& Key,
+ bool ForceDisableBlocks,
+ bool ForceDisableTempBlocks)
+ : m_CloudClient(CloudClient)
+ , m_Namespace(Namespace)
+ , m_Bucket(Bucket)
+ , m_Key(Key)
+ {
+ if (ForceDisableBlocks)
+ {
+ m_EnableBlocks = false;
+ }
+ if (ForceDisableTempBlocks)
+ {
+ m_UseTempBlocks = false;
+ }
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = m_EnableBlocks,
+ .UseTempBlockFiles = m_UseTempBlocks,
+ .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key)};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ const int32_t MaxAttempts = 3;
+ PutRefResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
+ }
+ }
+
+ return SaveResult{ConvertResult(Result), {Result.Needs.begin(), Result.Needs.end()} /*, {}*/, IoHash::HashBuffer(Payload)};
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ {
+ const int32_t MaxAttempts = 3;
+ CloudCacheResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
+ }
+ }
+
+ return SaveAttachmentResult{ConvertResult(Result)};
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ SaveAttachmentsResult Result;
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
+ SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash());
+ if (ChunkResult.ErrorCode)
+ {
+ return SaveAttachmentsResult{ChunkResult};
+ }
+ }
+ return Result;
+ }
+
+ virtual Result FinalizeContainer(const IoHash& RawHash) override
+ {
+ const int32_t MaxAttempts = 3;
+ CloudCacheResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
+ }
+ }
+ return ConvertResult(Result);
+ }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ const int32_t MaxAttempts = 3;
+ CloudCacheResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.GetRef(m_Namespace, m_Bucket, m_Key, ZenContentType::kCbObject);
+ }
+ }
+
+ if (Result.ErrorCode || !Result.Success)
+ {
+ return LoadContainerResult{ConvertResult(Result)};
+ }
+
+ CbObject ContainerObject = LoadCompactBinaryObject(Result.Response);
+ if (!ContainerObject)
+ {
+ return LoadContainerResult{
+ RemoteProjectStore::Result{
+ .ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Reason = fmt::format("The ref {}/{}/{} is not formatted as a compact binary object"sv, m_Namespace, m_Bucket, m_Key)},
+ std::move(ContainerObject)};
+ }
+
+ return LoadContainerResult{ConvertResult(Result), std::move(ContainerObject)};
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ const int32_t MaxAttempts = 3;
+ CloudCacheResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.GetCompressedBlob(m_Namespace, RawHash);
+ }
+ }
+ return LoadAttachmentResult{ConvertResult(Result), std::move(Result.Response)};
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ LoadAttachmentsResult Result;
+ for (const IoHash& Hash : RawHashes)
+ {
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ if (ChunkResult.ErrorCode)
+ {
+ return LoadAttachmentsResult{ChunkResult};
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
+ }
+ return Result;
+ }
+
+private:
+ static Result ConvertResult(const CloudCacheResult& Response)
+ {
+ std::string Text;
+ int32_t ErrorCode = 0;
+ if (Response.ErrorCode != 0)
+ {
+ ErrorCode = Response.ErrorCode;
+ }
+ else if (!Response.Success)
+ {
+ ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ if (Response.Response.GetContentType() == ZenContentType::kText)
+ {
+ Text =
+ std::string(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), Response.Response.GetSize());
+ }
+ }
+ return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
+ }
+
+ Ref<CloudCacheClient> m_CloudClient;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const IoHash m_Key;
+ bool m_EnableBlocks = true;
+ bool m_UseTempBlocks = true;
+};
+
+std::unique_ptr<RemoteProjectStore>
+CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume https URL
+ Url = fmt::format("https://{}"sv, Url);
+ }
+ CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv,
+ .ServiceUrl = Url,
+ .ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(60000)};
+ // 1) Access token as parameter in request
+ // 2) Environment variable (different win vs linux/mac)
+ // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
+
+ std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+ if (!Options.AccessToken.empty())
+ {
+ TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = Options.AccessToken]() {
+ return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()};
+ });
+ }
+ else
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() {
+ AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider);
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+
+ Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));
+
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient),
+ Options.Namespace,
+ Options.Bucket,
+ Options.Key,
+ Options.ForceDisableBlocks,
+ Options.ForceDisableTempBlocks);
+ return RemoteStore;
+}
+
+} // namespace zen
diff --git a/zenserver/projectstore/jupiterremoteprojectstore.h b/zenserver/projectstore/jupiterremoteprojectstore.h
new file mode 100644
index 000000000..31548af22
--- /dev/null
+++ b/zenserver/projectstore/jupiterremoteprojectstore.h
@@ -0,0 +1,26 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "remoteprojectstore.h"
+
+namespace zen {
+
+class AuthMgr;
+
+struct JupiterRemoteStoreOptions : RemoteStoreOptions
+{
+ std::string Url;
+ std::string Namespace;
+ std::string Bucket;
+ IoHash Key;
+ std::string OpenIdProvider;
+ std::string AccessToken;
+ AuthMgr& AuthManager;
+ bool ForceDisableBlocks;
+ bool ForceDisableTempBlocks;
+};
+
+std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options);
+
+} // namespace zen
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp
index a07698d50..82aac1605 100644
--- a/zenserver/projectstore/projectstore.cpp
+++ b/zenserver/projectstore/projectstore.cpp
@@ -5,32 +5,31 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compactbinaryvalue.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
-#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
#include <zenhttp/httpshared.h>
#include <zenstore/caslog.h>
+#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
-#include <zenutil/basicfile.h>
-#include "config.h"
-
-#include <latch>
+#include "fileremoteprojectstore.h"
+#include "jupiterremoteprojectstore.h"
+#include "remoteprojectstore.h"
+#include "zenremoteprojectstore.h"
ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
#include <xxh3.h>
ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -69,6 +68,149 @@ namespace {
Sleep(100);
} while (true);
}
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize)
+ {
+ using namespace std::literals;
+
+ std::unique_ptr<RemoteProjectStore> RemoteStore;
+
+ if (CbObjectView File = Params["file"sv].AsObjectView(); File)
+ {
+ std::filesystem::path FolderPath(File["path"sv].AsString());
+ if (FolderPath.empty())
+ {
+ return {nullptr, "Missing file path"};
+ }
+ std::string_view Name(File["name"sv].AsString());
+ if (Name.empty())
+ {
+ return {nullptr, "Missing file name"};
+ }
+ bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
+ bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
+
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ FolderPath,
+ std::string(Name),
+ ForceDisableBlocks,
+ ForceEnableTempBlocks};
+ RemoteStore = CreateFileRemoteStore(Options);
+ }
+
+ if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
+ {
+ std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
+ if (CloudServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl));
+ std::string_view Namespace = Cloud["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Cloud["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+#if PLATFORM_WINDOWS
+
+ CHAR EnvVariableBuffer[1023 + 1];
+ DWORD RESULT = GetEnvironmentVariableA("UE-CloudDataCacheAccessToken", EnvVariableBuffer, sizeof(EnvVariableBuffer));
+ if (RESULT > 0 && RESULT < sizeof(EnvVariableBuffer))
+ {
+ AccessToken = std::string(EnvVariableBuffer);
+ }
+#endif
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ char* EnvVariable = getenv("UE_CloudDataCacheAccessToken");
+ if (EnvVariable)
+ {
+ AccessToken = std::string(EnvVariable);
+ }
+#endif
+ }
+ std::string_view KeyParam = Cloud["key"sv].AsString();
+ if (KeyParam.empty())
+ {
+ return {nullptr, "Missing key"};
+ }
+ if (KeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid key"};
+ }
+ IoHash Key = IoHash::FromHexString(KeyParam);
+ if (Key == IoHash::Zero)
+ {
+ return {nullptr, "Invalid key string"};
+ }
+ bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
+
+ JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ Key,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks};
+ RemoteStore = CreateJupiterRemoteStore(Options);
+ }
+
+ if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
+ {
+ std::string_view Url = Zen["url"sv].AsString();
+ std::string_view Project = Zen["project"sv].AsString();
+ if (Project.empty())
+ {
+ return {nullptr, "Missing project"};
+ }
+ std::string_view Oplog = Zen["oplog"sv].AsString();
+ if (Oplog.empty())
+ {
+ return {nullptr, "Missing oplog"};
+ }
+ ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ std::string(Url),
+ std::string(Project),
+ std::string(Oplog)};
+ RemoteStore = CreateZenRemoteStore(Options);
+ }
+
+ if (!RemoteStore)
+ {
+ return {nullptr, "Unknown remote store type"};
+ }
+
+ return {std::move(RemoteStore), ""};
+ }
+
+ std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
+ {
+ if (Result.ErrorCode == 0)
+ {
+ return {HttpResponseCode::OK, Result.Text};
+ }
+ return {static_cast<HttpResponseCode>(Result.ErrorCode),
+ Result.Reason.empty() ? Result.Text
+ : Result.Text.empty() ? Result.Reason
+ : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)};
+ }
+
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -794,7 +936,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
XXH3_128 KeyHash = KeyHasher.GetHash();
RefPtr<OplogStorage> Storage;
-
{
RwLock::SharedLockScope _(m_OplogLock);
Storage = m_Storage;
@@ -1613,7 +1754,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
if (!FoundLog)
{
- return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
}
if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
@@ -1636,7 +1777,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
{
IoHash RawHash;
uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
ZEN_ASSERT(!Compressed.IsNull());
if (IsOffset)
@@ -1679,7 +1820,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
{
Size = Chunk.GetSize() - Offset;
}
- OutChunk = IoBuffer(Chunk, Offset, Size);
+ OutChunk = IoBuffer(std::move(Chunk), Offset, Size);
OutChunk.SetContentType(ContentType);
}
@@ -1693,8 +1834,6 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
ZenContentType AcceptType,
IoBuffer& OutChunk)
{
- using namespace std::literals;
-
Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
if (!Project)
{
@@ -1705,7 +1844,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
if (!FoundLog)
{
- return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
}
if (Cid.length() != IoHash::StringLength)
@@ -1721,25 +1860,401 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)};
}
- if (AcceptType == HttpContentType::kBinary)
+ if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary)
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk));
OutChunk = Compressed.Decompress().AsIoBuffer();
- OutChunk.SetContentType(HttpContentType::kBinary);
+ OutChunk.SetContentType(ZenContentType::kBinary);
}
else
{
- OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ OutChunk.SetContentType(ZenContentType::kCompressedBinary);
}
return {HttpResponseCode::OK, {}};
}
+std::pair<HttpResponseCode, std::string>
+ProjectStore::PutChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view Cid,
+ ZenContentType ContentType,
+ IoBuffer&& Chunk)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ if (Cid.length() != IoHash::StringLength)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)};
+ }
+
+ const IoHash Hash = IoHash::FromHexString(Cid);
+
+ if (ContentType != HttpContentType::kCompressedBinary)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)};
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ if (RawHash != Hash)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)};
+ }
+
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash);
+ return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ CbObject ContainerObject = LoadCompactBinaryObject(Payload);
+ if (!ContainerObject)
+ {
+ return {HttpResponseCode::BadRequest, "Invalid payload format"};
+ }
+
+ CidStore& ChunkStore = m_CidStore;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+
+ auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); };
+ auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ if (BlockHash != IoHash::Zero)
+ {
+ Attachments.insert(BlockHash);
+ }
+ else
+ {
+ Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
+ }
+ };
+ auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ Attachments.insert(RawHash);
+ };
+
+ RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+
+ if (RemoteResult.ErrorCode)
+ {
+ return ConvertResult(RemoteResult);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+ {
+ for (const IoHash& Hash : Attachments)
+ {
+ ZEN_DEBUG("Need attachment {}", Hash);
+ Cbo << Hash;
+ }
+ }
+ Cbo.EndArray(); // "need"
+
+ OutResponse = Cbo.Save();
+ return {HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::ReadOplog(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const HttpServerRequest::QueryParams& Params,
+ CbObject& OutResponse)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ size_t MaxBlockSize = 128u * 1024u * 1024u;
+ if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxBlockSize = Value.value();
+ }
+ }
+ size_t MaxChunkEmbedSize = 1024u * 1024u;
+ if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxChunkEmbedSize = Value.value();
+ }
+ }
+
+ CidStore& ChunkStore = m_CidStore;
+
+ RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
+ ChunkStore,
+ *Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ false,
+ [](CompressedBuffer&&, const IoHash) {},
+ [](const IoHash&) {},
+ [](const std::unordered_set<IoHash, IoHash::Hasher>) {});
+
+ OutResponse = std::move(ContainerResult.ContainerObject);
+ return ConvertResult(ContainerResult);
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ m_CidStore.AddChunk(Compressed, AttachmentRawHash);
+ ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize());
+ }))
+ {
+ return {HttpResponseCode::BadRequest, "Invalid chunk in block"};
+ }
+
+ return {HttpResponseCode::OK, {}};
+}
+
+void
+ProjectStore::Rpc(HttpServerRequest& HttpReq,
+ const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload,
+ AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+ HttpContentType PayloadContentType = HttpReq.RequestContentType();
+ CbPackage Package;
+ CbObject Cb;
+ switch (PayloadContentType)
+ {
+ case HttpContentType::kJSON:
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kText:
+ {
+ std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected JSON format");
+ }
+ }
+ break;
+ case HttpContentType::kCbObject:
+ Cb = LoadCompactBinaryObject(Payload);
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected compact binary format");
+ }
+ break;
+ case HttpContentType::kCbPackage:
+ Package = ParsePackageMessage(Payload);
+ Cb = Package.GetObject();
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected package message format");
+ }
+ break;
+ default:
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
+ }
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+
+ std::string_view Method = Cb["method"sv].AsString();
+
+ if (Method == "import")
+ {
+ std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ else if (Method == "export")
+ {
+ std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ else if (Method == "getchunks")
+ {
+ CbPackage ResponsePackage;
+ {
+ CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("chunks"sv);
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ IoHash RawHash = FieldView.AsHash();
+ IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
+ if (ChunkBuffer)
+ {
+ ResponseWriter.AddHash(RawHash);
+ ResponsePackage.AddAttachment(
+ CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash));
+ }
+ }
+ ResponseWriter.EndArray();
+ ResponsePackage.SetObject(ResponseWriter.Save());
+ }
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else if (Method == "putchunks")
+ {
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ CompressedBuffer Compressed = Attachment.AsCompressedBinary();
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ bool Force = Params["force"sv].AsBool(false);
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
+ CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
+
+ if (RemoteStoreResult.first == nullptr)
+ {
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
+ Project.Identifier,
+ Oplog.OplogId(),
+ StoreInfo.Description,
+ NiceBytes(MaxBlockSize),
+ NiceBytes(MaxChunkEmbedSize));
+
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *RemoteStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ StoreInfo.CreateBlocks,
+ StoreInfo.UseTempBlockFiles,
+ Force);
+
+ return ConvertResult(Result);
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ bool Force = Params["force"sv].AsBool(false);
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
+ CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
+
+ if (RemoteStoreResult.first == nullptr)
+ {
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
+ RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force);
+ return ConvertResult(Result);
+}
+
//////////////////////////////////////////////////////////////////////////
-HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
+HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, AuthMgr& AuthMgr)
: m_Log(logging::Get("project"))
, m_CidStore(Store)
, m_ProjectStore(Projects)
+, m_AuthMgr(AuthMgr)
{
using namespace std::literals;
@@ -1918,7 +2433,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
static_cast<int>(Result.first),
Result.second);
}
-
return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
},
HttpVerb::kGet);
@@ -2023,33 +2537,60 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- const auto& Cid = Req.GetCapture(3);
- HttpContentType AcceptType = HttpReq.AcceptContentType();
-
- IoBuffer Value;
- std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value);
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& Cid = Req.GetCapture(3);
+ HttpContentType AcceptType = HttpReq.AcceptContentType();
+ HttpContentType RequestType = HttpReq.RequestContentType();
- if (Result.first == HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value);
- }
- else if (Result.first == HttpResponseCode::NotFound)
- {
- ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid);
- }
- else
+ switch (Req.ServerRequest().RequestVerb())
{
- ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
- ToString(HttpReq.RequestVerb()),
- HttpReq.QueryString(),
- static_cast<int>(Result.first),
- Result.second);
+ case HttpVerb::kGet:
+ {
+ IoBuffer Value;
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value);
+
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value);
+ }
+ else if (Result.first == HttpResponseCode::NotFound)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ case HttpVerb::kPost:
+ {
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload());
+ if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created)
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ break;
}
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
},
- HttpVerb::kGet);
+ HttpVerb::kGet | HttpVerb::kPost);
m_Router.RegisterRoute(
"{project}/oplog/{log}/prep",
@@ -2556,6 +3097,67 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
}
},
HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete);
+
+ // Push a oplog container
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/save",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ if (HttpReq.RequestContentType() != HttpContentType::kCbObject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type");
+ }
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ CbObject Response;
+ std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kPost);
+
+ // Pull a oplog container
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/load",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ if (HttpReq.AcceptContentType() != HttpContentType::kCbObject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type");
+ }
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ CbObject Response;
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kGet);
+
+ // Do an rpc style operation on project/oplog
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/rpc",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr);
+ },
+ HttpVerb::kPost);
}
HttpProjectService::~HttpProjectService()
@@ -2625,9 +3227,14 @@ namespace testutils {
{
std::vector<uint8_t> Data;
Data.resize(Size);
- for (size_t Idx = 0; Idx < Size; ++Idx)
+ uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
+ for (size_t Idx = 0; Idx < Size / 2; ++Idx)
{
- Data[Idx] = Idx % 255;
+ DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu);
+ }
+ if (Size & 1)
+ {
+ Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff);
}
CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
@@ -2908,6 +3515,28 @@ TEST_CASE("project.store.partial.read")
const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
CHECK(FullDataPtr[0] == PartialDataPtr[0]);
}
+
+TEST_CASE("project.store.block")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489,
+ 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251,
+ 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
+
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
+ std::vector<SharedBuffer> Chunks;
+ Chunks.reserve(AttachmentSizes.size());
+ for (const auto& It : AttachmentsWithId)
+ {
+ Chunks.push_back(It.second.GetCompressed().Flatten());
+ }
+ CompressedBuffer Block = GenerateBlock(std::move(Chunks));
+ IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
+ CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
+}
+
#endif
void
diff --git a/zenserver/projectstore/projectstore.h b/zenserver/projectstore/projectstore.h
index 6b214d5a2..928a74f59 100644
--- a/zenserver/projectstore/projectstore.h
+++ b/zenserver/projectstore/projectstore.h
@@ -2,18 +2,11 @@
#pragma once
-#include <zencore/logging.h>
#include <zencore/uid.h>
#include <zencore/xxhash.h>
#include <zenhttp/httpserver.h>
-#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
-#include <filesystem>
-#include <map>
-#include <optional>
-#include <string>
-
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -21,6 +14,9 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
class CbPackage;
+class CidStore;
+class AuthMgr;
+class ScrubContext;
struct OplogEntry
{
@@ -141,13 +137,13 @@ public:
std::filesystem::path m_MarkerPath;
std::filesystem::path m_TempPath;
- mutable RwLock m_OplogLock;
- OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address
- OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address
- OidMap<FileMapEntry> m_FileMap; // file id -> file map entry
- int32_t m_ManifestVersion; // File system manifest version
- std::map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file
- OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key
+ mutable RwLock m_OplogLock;
+ OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address
+ OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address
+ OidMap<FileMapEntry> m_FileMap; // file id -> file map entry
+ int32_t m_ManifestVersion; // File system manifest version
+ tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file
+ OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key
RefPtr<OplogStorage> m_Storage;
std::string m_OplogId;
@@ -280,6 +276,42 @@ public:
ZenContentType AcceptType,
IoBuffer& OutChunk);
+ std::pair<HttpResponseCode, std::string> PutChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view Cid,
+ ZenContentType ContentType,
+ IoBuffer&& Chunk);
+
+ std::pair<HttpResponseCode, std::string> WriteOplog(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload,
+ CbObject& OutResponse);
+
+ std::pair<HttpResponseCode, std::string> ReadOplog(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const HttpServerRequest::QueryParams& Params,
+ CbObject& OutResponse);
+
+ std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload);
+
+ void Rpc(HttpServerRequest& HttpReq,
+ const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload,
+ AuthMgr& AuthManager);
+
+ std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project,
+ ProjectStore::Oplog& Oplog,
+ CbObjectView&& Params,
+ AuthMgr& AuthManager);
+
+ std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project,
+ ProjectStore::Oplog& Oplog,
+ CbObjectView&& Params,
+ AuthMgr& AuthManager);
+
private:
spdlog::logger& m_Log;
CidStore& m_CidStore;
@@ -312,7 +344,7 @@ private:
class HttpProjectService : public HttpService
{
public:
- HttpProjectService(CidStore& Store, ProjectStore* InProjectStore);
+ HttpProjectService(CidStore& Store, ProjectStore* InProjectStore, AuthMgr& AuthMgr);
~HttpProjectService();
virtual const char* BaseUri() const override;
@@ -325,6 +357,7 @@ private:
CidStore& m_CidStore;
HttpRequestRouter m_Router;
Ref<ProjectStore> m_ProjectStore;
+ AuthMgr& m_AuthMgr;
};
void prj_forcelink();
diff --git a/zenserver/projectstore/remoteprojectstore.cpp b/zenserver/projectstore/remoteprojectstore.cpp
new file mode 100644
index 000000000..1e6ca51a1
--- /dev/null
+++ b/zenserver/projectstore/remoteprojectstore.cpp
@@ -0,0 +1,1036 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "remoteprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zencore/workthreadpool.h>
+#include <zenstore/cidstore.h>
+
+namespace zen {
+
+/*
+ OplogContainer
+ Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller
+ {
+ CbArray("ops")
+ {
+ CbObject Op
+ (CbFieldType::BinaryAttachment Attachments[])
+ (OpData)
+ }
+ }
+ CbArray("blocks")
+ CbObject
+ CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File)
+ CbArray("chunks")
+ CbFieldType::Hash // Chunk hashes
+ CbArray("chunks") // Optional, only if we are not creating blocks (Zen)
+ CbFieldType::BinaryAttachment // Chunk attachment hashes
+
+ CompressedBinary ChunkBlock
+ {
+ VarUInt ChunkCount
+ VarUInt ChunkSizes[ChunkCount]
+ uint8_t[chunksize])[ChunkCount]
+ }
+*/
+
+////////////////////////////// AsyncRemoteResult
+
+struct AsyncRemoteResult
+{
+ void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText)
+ {
+ int32_t Expected = 0;
+ if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1))
+ {
+ m_ErrorReason = ErrorReason;
+ m_ErrorText = ErrorText;
+ }
+ }
+ bool IsError() const { return m_ErrorCode.load() != 0; }
+ int GetError() const { return m_ErrorCode.load(); };
+ const std::string& GetErrorReason() const { return m_ErrorReason; };
+ const std::string& GetErrorText() const { return m_ErrorText; };
+ RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const
+ {
+ return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText};
+ }
+
+private:
+ std::atomic<int32_t> m_ErrorCode = 0;
+ std::string m_ErrorReason;
+ std::string m_ErrorText;
+};
+
+bool
+IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+{
+ IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer();
+
+ MemoryView BlockView = BlockPayload.GetView();
+ const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
+ uint32_t NumberSize;
+ uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize);
+ ReadPtr += NumberSize;
+ std::vector<uint64_t> ChunkSizes;
+ ChunkSizes.reserve(ChunkCount);
+ while (ChunkCount--)
+ {
+ ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize));
+ ReadPtr += NumberSize;
+ }
+ ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr);
+ ZEN_ASSERT(TempBufferLength > 0);
+ for (uint64_t ChunkSize : ChunkSizes)
+ {
+ IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize);
+ IoHash AttachmentRawHash;
+ uint64_t AttachmentRawSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize);
+
+ if (!CompressedChunk)
+ {
+ ZEN_ERROR("Invalid chunk in block");
+ return false;
+ }
+ Visitor(std::move(CompressedChunk), AttachmentRawHash);
+ ReadPtr += ChunkSize;
+ ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd());
+ }
+ return true;
+};
+
+CompressedBuffer
+GenerateBlock(std::vector<SharedBuffer>&& Chunks)
+{
+ size_t ChunkCount = Chunks.size();
+ SharedBuffer SizeBuffer;
+ {
+ IoBuffer TempBuffer(ChunkCount * 9);
+ MutableMemoryView View = TempBuffer.GetMutableView();
+ uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData());
+ uint8_t* BufferEndPtr = BufferStartPtr;
+ BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
+ auto It = Chunks.begin();
+ while (It != Chunks.end())
+ {
+ BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr);
+ It++;
+ }
+ ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
+ ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
+ SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
+ }
+ CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks)));
+
+ CompressedBuffer CompressedBlock =
+ CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+
+ return CompressedBlock;
+}
+
+struct Block
+{
+ IoHash BlockHash;
+ std::vector<IoHash> ChunksInBlock;
+};
+
+void
+CreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<SharedBuffer>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<Block>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
+{
+ OpSectionsLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable {
+ auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ if (!Chunks.empty())
+ {
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ AsyncOnBlock(std::move(CompressedBlock), BlockHash);
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex].BlockHash = BlockHash;
+ }
+ }
+ });
+}
+
+size_t
+AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
+{
+ size_t BlockIndex;
+ {
+ RwLock::ExclusiveLockScope _(BlocksLock);
+ BlockIndex = Blocks.size();
+ Blocks.resize(BlockIndex + 1);
+ }
+ return BlockIndex;
+}
+
+CbObject
+BuildContainer(CidStore& ChunkStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ WorkerThreadPool& WorkerPool,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ AsyncRemoteResult& RemoteResult)
+{
+ using namespace std::literals;
+
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ CbObjectWriter SectionOpsWriter;
+ SectionOpsWriter.BeginArray("ops"sv);
+
+ size_t OpCount = 0;
+
+ CbObject OplogContainerObject;
+ {
+ RwLock BlocksLock;
+ std::vector<Block> Blocks;
+ CompressedBuffer OpsBuffer;
+
+ Latch BlockCreateLatch(1);
+
+ std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
+
+ size_t BlockSize = 0;
+ std::vector<SharedBuffer> ChunksInBlock;
+
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+ Oplog.IterateOplog([&Attachments, &SectionOpsWriter, &OpCount](CbObject Op) {
+ Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert(FieldView.AsAttachment()); });
+ (SectionOpsWriter) << Op;
+ OpCount++;
+ });
+
+ for (const IoHash& AttachmentHash : Attachments)
+ {
+ IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash);
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {} for op", AttachmentHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason());
+ return {};
+ }
+ uint64_t PayloadSize = Payload.GetSize();
+ if (PayloadSize > MaxChunkEmbedSize)
+ {
+ if (LargeChunkHashes.insert(AttachmentHash).second)
+ {
+ OnLargeAttachment(AttachmentHash);
+ }
+ continue;
+ }
+
+ if (!BlockAttachmentHashes.insert(AttachmentHash).second)
+ {
+ continue;
+ }
+
+ BlockSize += PayloadSize;
+ if (BuildBlocks)
+ {
+ ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ }
+ else
+ {
+ Payload = {};
+ }
+
+ if (BlockSize >= MaxBlockSize)
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ }
+ else
+ {
+ OnBlockChunks(BlockAttachmentHashes);
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ }
+ }
+ if (BlockSize > 0)
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ }
+ else
+ {
+ OnBlockChunks(BlockAttachmentHashes);
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ }
+ SectionOpsWriter.EndArray(); // "ops"
+
+ CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
+ ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize()));
+
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining());
+ }
+
+ if (!RemoteResult.IsError())
+ {
+ CbObjectWriter OplogContinerWriter;
+ RwLock::SharedLockScope _(BlocksLock);
+ OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
+
+ OplogContinerWriter.BeginArray("blocks"sv);
+ {
+ for (const Block& B : Blocks)
+ {
+ ZEN_ASSERT(!B.ChunksInBlock.empty());
+ if (BuildBlocks)
+ {
+ ZEN_ASSERT(B.BlockHash != IoHash::Zero);
+
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash);
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : B.ChunksInBlock)
+ {
+ OplogContinerWriter.AddHash(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+ }
+ OplogContinerWriter.EndObject();
+ continue;
+ }
+
+ ZEN_ASSERT(B.BlockHash == IoHash::Zero);
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : B.ChunksInBlock)
+ {
+ OplogContinerWriter.AddBinaryAttachment(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray();
+ }
+ OplogContinerWriter.EndObject();
+ }
+ }
+ OplogContinerWriter.EndArray(); // "blocks"sv
+
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& AttachmentHash : LargeChunkHashes)
+ {
+ OplogContinerWriter.AddBinaryAttachment(AttachmentHash);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+
+ OplogContainerObject = OplogContinerWriter.Save();
+ }
+ }
+ return OplogContainerObject;
+}
+
+RemoteProjectStore::LoadContainerResult
+BuildContainer(CidStore& ChunkStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks)
+{
+ // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a
+ // WorkerThreadPool alive
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ AsyncRemoteResult RemoteResult;
+ CbObject ContainerObject = BuildContainer(ChunkStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ BuildBlocks,
+ WorkerPool,
+ AsyncOnBlock,
+ OnLargeAttachment,
+ OnBlockChunks,
+ RemoteResult);
+ return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
+}
+
+RemoteProjectStore::Result
+SaveOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ bool UseTempBlocks,
+ bool ForceUpload)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ // We are creating a worker thread pool here since we are uploading a lot of attachments in one go
+ // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive.
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ std::filesystem::path AttachmentTempPath;
+ if (UseTempBlocks)
+ {
+ AttachmentTempPath = Oplog.TempPath();
+ AttachmentTempPath.append(".pending");
+ CreateDirectories(AttachmentTempPath);
+ }
+
+ AsyncRemoteResult RemoteResult;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
+
+ auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
+ const IoHash& BlockHash) {
+ std::filesystem::path BlockPath = AttachmentTempPath;
+ BlockPath.append(BlockHash.ToHexString());
+ if (!std::filesystem::exists(BlockPath))
+ {
+ IoBuffer BlockBuffer;
+ try
+ {
+ BasicFile BlockFile;
+ BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete);
+ uint64_t Offset = 0;
+ for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments())
+ {
+ BlockFile.Write(Buffer.GetView(), Offset);
+ Offset += Buffer.GetSize();
+ }
+ void* FileHandle = BlockFile.Detach();
+ BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset);
+ }
+ catch (std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ Ex.what(),
+ "Unable to create temp block file");
+ return;
+ }
+
+ BlockBuffer.MarkAsDeleteOnClose();
+ {
+ RwLock::ExclusiveLockScope __(AttachmentsLock);
+ CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
+ }
+ ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
+ }
+ };
+
+ auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
+ RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError());
+ return;
+ }
+ ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
+ };
+
+ std::vector<std::vector<IoHash>> BlockChunks;
+ auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) {
+ BlockChunks.push_back({Chunks.begin(), Chunks.end()});
+ ZEN_DEBUG("Found {} block chunks", Chunks.size());
+ };
+
+ auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) {
+ {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ LargeAttachments.insert(AttachmentHash);
+ }
+ ZEN_DEBUG("Found attachment {}", AttachmentHash);
+ };
+
+ std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock;
+ if (UseTempBlocks)
+ {
+ OnBlock = MakeTempBlock;
+ }
+ else
+ {
+ OnBlock = UploadBlock;
+ }
+
+ CbObject OplogContainerObject = BuildContainer(ChunkStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ BuildBlocks,
+ WorkerPool,
+ OnBlock,
+ OnLargeAttachment,
+ OnBlockChunks,
+ RemoteResult);
+
+ if (!RemoteResult.IsError())
+ {
+ uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
+ uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
+ ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount);
+ RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
+ if (ContainerSaveResult.ErrorCode)
+ {
+ RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
+ ZEN_ERROR("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError());
+ }
+ ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)));
+ if (!ContainerSaveResult.Needs.empty())
+ {
+ ZEN_INFO("Filtering needed attachments...");
+ std::vector<IoHash> NeededLargeAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments;
+ NeededLargeAttachments.reserve(LargeAttachments.size());
+ NeededOtherAttachments.reserve(CreatedBlocks.size());
+ if (ForceUpload)
+ {
+ NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end());
+ }
+ else
+ {
+ for (const IoHash& RawHash : ContainerSaveResult.Needs)
+ {
+ if (LargeAttachments.contains(RawHash))
+ {
+ NeededLargeAttachments.push_back(RawHash);
+ continue;
+ }
+ NeededOtherAttachments.insert(RawHash);
+ }
+ }
+
+ Latch SaveAttachmentsLatch(1);
+ if (!NeededLargeAttachments.empty())
+ {
+ ZEN_INFO("Saving large attachments...");
+ for (const IoHash& RawHash : NeededLargeAttachments)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ IoBuffer Payload;
+ if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end())
+ {
+ Payload = std::move(It->second);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
+ RemoteResult.GetErrorReason(),
+ RemoteResult.GetError());
+ return;
+ }
+
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
+ }
+ }
+
+ if (!CreatedBlocks.empty())
+ {
+ ZEN_INFO("Saving created block attachments...");
+ for (auto& It : CreatedBlocks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ const IoHash& RawHash = It.first;
+ if (ForceUpload || NeededOtherAttachments.contains(RawHash))
+ {
+ IoBuffer Payload = It.second;
+ ZEN_ASSERT(Payload);
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
+ }
+ It.second = {};
+ }
+ }
+
+ if (!BlockChunks.empty())
+ {
+ ZEN_INFO("Saving chunk block attachments...");
+ for (const std::vector<IoHash>& Chunks : BlockChunks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ std::vector<IoHash> NeededChunks;
+ if (ForceUpload)
+ {
+ NeededChunks = Chunks;
+ }
+ else
+ {
+ NeededChunks.reserve(Chunks.size());
+ for (const IoHash& Chunk : Chunks)
+ {
+ if (NeededOtherAttachments.contains(Chunk))
+ {
+ NeededChunks.push_back(Chunk);
+ }
+ }
+ if (NeededChunks.empty())
+ {
+ continue;
+ }
+ }
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ &Chunks,
+ NeededChunks = std::move(NeededChunks),
+ ForceUpload]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk);
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload)));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ });
+ }
+ }
+ SaveAttachmentsLatch.CountDown();
+ while (!SaveAttachmentsLatch.Wait(1000))
+ {
+ ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
+ }
+ SaveAttachmentsLatch.Wait();
+ }
+
+ if (!RemoteResult.IsError())
+ {
+ ZEN_INFO("Finalizing oplog container...");
+ RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
+ if (ContainerFinalizeResult.ErrorCode)
+ {
+ RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
+ ZEN_ERROR("Failed to finalize oplog container {} ({}). Reason: '{}'",
+ ContainerSaveResult.RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ }
+ ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000)));
+ }
+ }
+
+ RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ ZEN_INFO("Saved oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return Result;
+};
+
+RemoteProjectStore::Result
+SaveOplogContainer(ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ for (CbFieldView LargeChunksField : LargeChunksArray)
+ {
+ IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
+ if (HasAttachment(AttachmentHash))
+ {
+ continue;
+ }
+ OnNeedAttachment(AttachmentHash);
+ };
+
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ for (CbFieldView BlockField : BlocksArray)
+ {
+ CbObjectView BlockView = BlockField.AsObjectView();
+ IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
+
+ CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
+ if (BlockHash == IoHash::Zero)
+ {
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(ChunksArray.GetSize());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ IoHash ChunkHash = ChunkField.AsBinaryAttachment();
+ if (HasAttachment(ChunkHash))
+ {
+ continue;
+ }
+ NeededChunks.emplace_back(ChunkHash);
+ }
+
+ if (!NeededChunks.empty())
+ {
+ OnNeedBlock(IoHash::Zero, std::move(NeededChunks));
+ }
+ continue;
+ }
+
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ IoHash ChunkHash = ChunkField.AsHash();
+ if (HasAttachment(ChunkHash))
+ {
+ continue;
+ }
+
+ OnNeedBlock(BlockHash, {});
+ break;
+ }
+ };
+
+ MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
+ IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
+ IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
+
+ CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
+ if (!SectionObject)
+ {
+ ZEN_ERROR("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type");
+ return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
+ Timer.GetElapsedTimeMs() / 1000.500,
+ "Section has unexpected data type",
+ "Failed to save oplog container"};
+ }
+
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ CbObjectView Core = OpEntry.AsObjectView();
+ BinaryWriter Writer;
+ Core.CopyTo(Writer);
+ MemoryView OpView = Writer.GetView();
+ IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
+ CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
+ const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op);
+ if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ {
+ return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
+ Timer.GetElapsedTimeMs() / 1000.500,
+ "Failed saving op",
+ "Failed to save oplog container"};
+ }
+ ZEN_DEBUG("oplog entry #{}", OpLsn);
+ }
+ return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
+}
+
+RemoteProjectStore::Result
+LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a
+ // WorkerThreadPool alive
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+ std::vector<std::vector<IoHash>> ChunksInBlocks;
+
+ RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
+ if (LoadContainerResult.ErrorCode)
+ {
+ ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode);
+ return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Reason = LoadContainerResult.Reason,
+ .Text = LoadContainerResult.Text};
+ }
+ ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)));
+
+ AsyncRemoteResult RemoteResult;
+ Latch AttachmentsWorkLatch(1);
+
+ auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
+ return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
+ };
+ auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult](
+ const IoHash& BlockHash,
+ std::vector<IoHash>&& Chunks) {
+ if (BlockHash == IoHash::Zero)
+ {
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ for (const auto& It : Result.Chunks)
+ {
+ ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly);
+ }
+ });
+ return;
+ }
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ ZEN_ERROR("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)));
+
+ if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
+ }))
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ ZEN_ERROR("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ });
+ };
+
+ auto OnNeedAttachment =
+ [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) {
+ if (!Attachments.insert(RawHash).second)
+ {
+ return;
+ }
+
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode);
+ return;
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
+ });
+ };
+
+ RemoteProjectStore::Result Result =
+ SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+
+ AttachmentsWorkLatch.CountDown();
+ while (!AttachmentsWorkLatch.Wait(1000))
+ {
+ ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining());
+ }
+ AttachmentsWorkLatch.Wait();
+ if (Result.ErrorCode == 0)
+ {
+ Result = RemoteResult.ConvertResult();
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+
+ ZEN_INFO("Loaded oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)));
+
+ return Result;
+}
+
+} // namespace zen
diff --git a/zenserver/projectstore/remoteprojectstore.h b/zenserver/projectstore/remoteprojectstore.h
new file mode 100644
index 000000000..dcabaedd4
--- /dev/null
+++ b/zenserver/projectstore/remoteprojectstore.h
@@ -0,0 +1,111 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "projectstore.h"
+
+#include <unordered_set>
+
+namespace zen {
+
+class CidStore;
+class WorkerThreadPool;
+
+class RemoteProjectStore
+{
+public:
+ struct Result
+ {
+ int32_t ErrorCode{};
+ double ElapsedSeconds{};
+ std::string Reason;
+ std::string Text;
+ };
+
+ struct SaveResult : public Result
+ {
+ std::unordered_set<IoHash, IoHash::Hasher> Needs;
+ IoHash RawHash;
+ };
+
+ struct SaveAttachmentResult : public Result
+ {
+ };
+
+ struct SaveAttachmentsResult : public Result
+ {
+ };
+
+ struct LoadAttachmentResult : public Result
+ {
+ IoBuffer Bytes;
+ };
+
+ struct LoadContainerResult : public Result
+ {
+ CbObject ContainerObject;
+ };
+
+ struct LoadAttachmentsResult : public Result
+ {
+ std::vector<std::pair<IoHash, CompressedBuffer>> Chunks;
+ };
+
+ struct RemoteStoreInfo
+ {
+ bool CreateBlocks;
+ bool UseTempBlockFiles;
+ std::string Description;
+ };
+
+ virtual ~RemoteProjectStore() {}
+
+ virtual RemoteStoreInfo GetInfo() const = 0;
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0;
+ virtual Result FinalizeContainer(const IoHash& RawHash) = 0;
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
+
+ virtual LoadContainerResult LoadContainer() = 0;
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0;
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0;
+};
+
+struct RemoteStoreOptions
+{
+ size_t MaxBlockSize = 128u * 1024u * 1024u;
+ size_t MaxChunkEmbedSize = 1024u * 1024u;
+};
+
+RemoteProjectStore::LoadContainerResult BuildContainer(
+ CidStore& ChunkStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks);
+
+RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment);
+
+RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ bool UseTempBlocks,
+ bool ForceUpload);
+
+RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload);
+
+CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks);
+bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+
+} // namespace zen
diff --git a/zenserver/projectstore/zenremoteprojectstore.cpp b/zenserver/projectstore/zenremoteprojectstore.cpp
new file mode 100644
index 000000000..6ff471ae5
--- /dev/null
+++ b/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -0,0 +1,341 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenremoteprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zenhttp/httpshared.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class ZenRemoteStore : public RemoteProjectStore
+{
+public:
+ ZenRemoteStore(std::string_view HostAddress,
+ std::string_view Project,
+ std::string_view Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize)
+ : m_HostAddress(HostAddress)
+ , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress))
+ , m_Project(Project)
+ , m_Oplog(Oplog)
+ , m_MaxBlockSize(MaxBlockSize)
+ , m_MaxChunkEmbedSize(MaxChunkEmbedSize)
+ {
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
+ MemoryView Data(Payload.GetView());
+ Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()});
+ cpr::Response Response = Session->Post();
+ SaveResult Result = SaveResult{ConvertResult(Response)};
+
+ if (Result.ErrorCode)
+ {
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload);
+ if (!ResponseObject)
+ {
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView();
+ for (CbFieldView FieldView : NeedsArray)
+ {
+ IoHash ChunkHash = FieldView.AsHash();
+ Result.Needs.insert(ChunkHash);
+ }
+
+ Result.RawHash = IoHash::HashBuffer(Payload);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
+ cpr::Response Response = Session->Post();
+ SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ Stopwatch Timer;
+
+ CbPackage RequestPackage;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "putchunks"sv);
+ RequestWriter.BeginArray("chunks"sv);
+ {
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize);
+ RequestWriter.AddHash(RawHash);
+ RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash));
+ }
+ }
+ RequestWriter.EndArray(); // "chunks"
+ RequestPackage.SetObject(RequestWriter.Save());
+ }
+ CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault);
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
+
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
+ cpr::Response Response = Session->Post();
+ SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+
+ CbObject Request;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "getchunks"sv);
+ RequestWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : RawHashes)
+ {
+ RequestWriter.AddHash(RawHash);
+ }
+ }
+ RequestWriter.EndArray(); // "chunks"
+ Request = RequestWriter.Save();
+ }
+ IoBuffer Payload = Request.GetBuffer().AsIoBuffer();
+ Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
+ Session->SetUrl(SaveRequest);
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))},
+ {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
+
+ cpr::Response Response = Session->Post();
+ LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ Result.Chunks.reserve(Attachments.size());
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
+ }
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ };
+
+ virtual Result FinalizeContainer(const IoHash&) override
+ {
+ Stopwatch Timer;
+
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ Sessions.clear();
+ return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
+ }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl(SaveRequest);
+ Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
+ Session->SetParameters(
+ {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}});
+ cpr::Response Response = Session->Get();
+
+ LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size()));
+ if (!Result.ContainerObject)
+ {
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
+ Session->SetUrl({LoadRequest});
+ Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
+ cpr::Response Response = Session->Get();
+ LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+private:
+ std::unique_ptr<cpr::Session> AllocateSession()
+ {
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ if (Sessions.empty())
+ {
+ Sessions.emplace_back(std::make_unique<cpr::Session>());
+ }
+ std::unique_ptr<cpr::Session> Session = std::move(Sessions.back());
+ Sessions.pop_back();
+ return Session;
+ }
+
+ void ReleaseSession(std::unique_ptr<cpr::Session>&& Session)
+ {
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ Sessions.emplace_back(std::move(Session));
+ }
+
+ static Result ConvertResult(const cpr::Response& Response)
+ {
+ std::string Text;
+ std::string Reason = Response.reason;
+ int32_t ErrorCode = 0;
+ if (Response.error.code != cpr::ErrorCode::OK)
+ {
+ ErrorCode = static_cast<int32_t>(Response.error.code);
+ if (!Response.error.message.empty())
+ {
+ Reason = Response.error.message;
+ }
+ }
+ else if (!IsHttpSuccessCode(Response.status_code))
+ {
+ ErrorCode = static_cast<int32_t>(Response.status_code);
+
+ if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
+ {
+ zen::HttpContentType ContentType = zen::ParseContentType(It->second);
+ if (ContentType == zen::HttpContentType::kText)
+ {
+ Text = Response.text;
+ }
+ }
+
+ Reason = fmt::format("{}"sv, Response.status_code);
+ }
+ return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text};
+ }
+
+ RwLock SessionsLock;
+ std::vector<std::unique_ptr<cpr::Session>> Sessions;
+
+ const std::string m_HostAddress;
+ const std::string m_ProjectStoreUrl;
+ const std::string m_Project;
+ const std::string m_Oplog;
+ const size_t m_MaxBlockSize;
+ const size_t m_MaxChunkEmbedSize;
+};
+
+std::unique_ptr<RemoteProjectStore>
+CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume https URL
+ Url = fmt::format("http://{}"sv, Url);
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore =
+ std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
+ return RemoteStore;
+}
+
+} // namespace zen
diff --git a/zenserver/projectstore/zenremoteprojectstore.h b/zenserver/projectstore/zenremoteprojectstore.h
new file mode 100644
index 000000000..ef9dcad8c
--- /dev/null
+++ b/zenserver/projectstore/zenremoteprojectstore.h
@@ -0,0 +1,18 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "remoteprojectstore.h"
+
+namespace zen {
+
+struct ZenRemoteStoreOptions : RemoteStoreOptions
+{
+ std::string Url;
+ std::string ProjectId;
+ std::string OplogId;
+};
+
+std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options);
+
+} // namespace zen
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 260b83355..dbb185bec 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -6,6 +6,7 @@
#include "diag/logging.h"
#include <zencore/compactbinary.h>
+#include <zencore/compositebuffer.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/string.h>
@@ -437,6 +438,47 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
}
CloudCacheResult
+CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
+{
+ ZEN_TRACE_CPU("HordeClient::PutCompressedBlob");
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
+
+ cpr::Session& Session = GetSession();
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
+
+ cpr::Response Response = Session.Put();
+ ZEN_DEBUG("PUT {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
+}
+
+CloudCacheResult
CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
{
ZEN_TRACE_CPU("HordeClient::PutObject");
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 88ab77247..99e5c530f 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -104,6 +104,7 @@ public:
PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index cf771c6de..9eae2761d 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -262,7 +262,7 @@ public:
ZEN_INFO("instantiating project service");
m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager);
- m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore});
+ m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, *m_AuthMgr});
#if ZEN_WITH_COMPUTE_SERVICES
if (ServerOptions.ComputeServiceEnabled)
diff --git a/zenstore/include/zenstore/scrubcontext.h b/zenstore/include/zenstore/scrubcontext.h
index bf906492c..0b884fcc6 100644
--- a/zenstore/include/zenstore/scrubcontext.h
+++ b/zenstore/include/zenstore/scrubcontext.h
@@ -3,6 +3,7 @@
#pragma once
#include <zencore/timer.h>
+#include <zenstore/hashkeyset.h>
namespace zen {
diff --git a/zenutil/include/zenutil/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h
index e6b0a6710..3ec4b19b0 100644
--- a/zenutil/include/zenutil/zenserverprocess.h
+++ b/zenutil/include/zenutil/zenserverprocess.h
@@ -7,8 +7,6 @@
#include <zencore/thread.h>
#include <zencore/uid.h>
-#include <gsl/gsl-lite.hpp>
-
#include <atomic>
#include <filesystem>
#include <optional>