diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-22 08:22:06 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-22 14:22:06 +0200 |
| commit | c7d4dc6a4d13881028d566f5ce501335e47e48bf (patch) | |
| tree | 493110da583a8e5d97fe05e14f23469ee6244d2b /src/zen/cmds/projectstore_cmd.cpp | |
| parent | add trace command to enable/disable tracing at runtime (#416) (diff) | |
| download | archived-zen-c7d4dc6a4d13881028d566f5ce501335e47e48bf.tar.xz archived-zen-c7d4dc6a4d13881028d566f5ce501335e47e48bf.zip | |
Collect all zen admin-related commands into admin.h/.cpp (#418)
* move commands in scrub.h/cpp to admin_cmd.h/cpp
* move job command into admin_cmd.h/.cpp
* admin -> admin_cmd
* bench -> bench_cmd
* cache -> cache_cmd
* copy -> copy_cmd
* dedup -> dedup_cmd
* hash -> hash_cmd
* print -> print_cmd
* projectstore -> projectstore_cmd
* rpcreplay -> rpcreplay_cmd
* serve -> serve_cmd
* status -> status_cmd
* top -> top_cmd
* trace -> trace_cmd
* up -> up_cmd
* version -> version_cmd
Diffstat (limited to 'src/zen/cmds/projectstore_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 1558 |
1 files changed, 1558 insertions, 0 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp new file mode 100644 index 000000000..5795b3190 --- /dev/null +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -0,0 +1,1558 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "projectstore_cmd.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/formatters.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/httpcommon.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <signal.h> + +namespace zen { + +namespace { + + using namespace std::literals; + + const std::string DefaultCloudAccessTokenEnvVariableName( +#if ZEN_PLATFORM_WINDOWS + "UE-CloudDataCacheAccessToken"sv +#endif +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + "UE_CloudDataCacheAccessToken"sv +#endif + + ); + + IoBuffer MakeCbObjectPayload(std::function<void(CbObjectWriter& Writer)> WriteCB) + { + CbObjectWriter Writer; + WriteCB(Writer); + IoBuffer Payload = Writer.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + return Payload; + }; + + static std::atomic_uint32_t SignalCounter[NSIG] = {0}; + + static void SignalCallbackHandler(int SigNum) + { + if (SigNum >= 0 && SigNum < NSIG) + { + SignalCounter[SigNum].fetch_add(1); + } + } + + void AsyncPost(HttpClient& Http, std::string_view Url, IoBuffer&& Payload) + { + if (HttpClient::Response Result = Http.Post(Url, Payload)) + { + if (Result.StatusCode == HttpResponseCode::Accepted) + { + signal(SIGINT, SignalCallbackHandler); + bool Cancelled = false; + + std::string_view JobIdText = Result.AsText(); + std::optional<uint64_t> JobIdMaybe = ParseInt<uint64_t>(JobIdText); + if (!JobIdMaybe) + { + Result.ThrowError("invalid job id"sv); + } + + std::string LastCurrentOp; + uint32_t LastCurrentOpPercentComplete = 0; + + uint64_t JobId = JobIdMaybe.value(); + while (true) + { + HttpClient::Response StatusResult = + Http.Get(fmt::format("/admin/jobs/{}", JobId), HttpClient::Accept(ZenContentType::kCbObject)); + if (!StatusResult) + { + StatusResult.ThrowError("failed to create project"sv); + } + CbObject StatusObject = StatusResult.AsObject(); + std::string_view Status = StatusObject["Status"sv].AsString(); + CbArrayView Messages = StatusObject["Messages"sv].AsArrayView(); + for (auto M : Messages) + { + std::string_view Message = M.AsString(); + ZEN_CONSOLE("{}", Message); + } + if (Status == "Complete") + { + if (Cancelled) + { + ZEN_CONSOLE("Cancelled"); + } + else + { + double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); + double RuntimeS = StatusObject["RunTimeS"].AsDouble(); + ZEN_CONSOLE("Completed: QueueTime: {:.3} s, RunTime: {:.3} s", QueueTimeS, RuntimeS); + } + break; + } + if (Status == "Aborted") + { + Result.ThrowError("Aborted"); + break; + } + if (Status == "Queued") + { + double QueueTimeS = StatusObject["QueueTimeS"].AsDouble(); + ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS); + } + if (Status == "Running") + { + std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString(); + uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32(); + if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete) + { + LastCurrentOp = CurrentOp; + LastCurrentOpPercentComplete = CurrentOpPercentComplete; + ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete); + } + } + uint32_t AbortCounter = SignalCounter[SIGINT].load(); + if (SignalCounter[SIGINT] > 0) + { + SignalCounter[SIGINT].fetch_sub(AbortCounter); + if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId))) + { + ZEN_CONSOLE("Requested cancel..."); + Cancelled = true; + } + else + { + ZEN_CONSOLE("Failed cancelling job {}", DeleteResult); + } + continue; + } + Sleep(100); + } + } + else + { + ZEN_CONSOLE("{}", Result); + } + } + else + { + Result.ThrowError("failed to start operation"sv); + } + } + +} // namespace + +/////////////////////////////////////// + +DropProjectCommand::DropProjectCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.parse_positional({"project", "oplog"}); + m_Options.positional_help("[<projectid> [<oplogid>]]"); +} + +DropProjectCommand::~DropProjectCommand() +{ +} + +int +DropProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectName.empty()) + { + throw OptionParseException("Drop command requires a project"); + } + + HttpClient Http(m_HostName); + if (m_OplogName.empty()) + { + ZEN_CONSOLE("Dropping project '{}' from '{}'", m_ProjectName, m_HostName); + if (HttpClient::Response Result = Http.Delete(fmt::format("/prj/{}", m_ProjectName))) + { + ZEN_CONSOLE("{}", Result); + } + else + { + Result.ThrowError("delete project failed"sv); + return 1; + } + } + else + { + ZEN_CONSOLE("Dropping oplog '{}/{}' from '{}'", m_ProjectName, m_OplogName, m_HostName); + if (HttpClient::Response Result = Http.Delete(fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName))) + { + ZEN_CONSOLE("{}", Result); + } + else + { + Result.ThrowError("delete oplog failed"sv); + return 1; + } + } + + return 0; +} + +/////////////////////////////////////// + +ProjectInfoCommand::ProjectInfoCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.parse_positional({"project", "oplog"}); + m_Options.positional_help("[<projectid> [<oplogid>]]"); +} + +ProjectInfoCommand::~ProjectInfoCommand() +{ +} + +int +ProjectInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (!m_OplogName.empty() && m_ProjectName.empty()) + { + throw OptionParseException("an oplog can't be specified without also specifying a project"); + } + + HttpClient Http(m_HostName); + + std::string Url; + if (m_ProjectName.empty()) + { + Url = "/prj"; + ZEN_CONSOLE("Info from '{}'", Url); + } + else if (m_OplogName.empty()) + { + Url = fmt::format("/prj/{}", m_ProjectName); + ZEN_CONSOLE("Info on project '{}' from '{}{}'", m_ProjectName, m_HostName, Url); + } + else + { + Url = fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName); + ZEN_CONSOLE("Info on oplog '{}/{}' from '{}{}'", m_ProjectName, m_OplogName, m_HostName, Url); + } + + if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) + { + ZEN_CONSOLE("{}", Result.ToText()); + } + else + { + Result.ThrowError("failed to fetch info"sv); + return 1; + } + return 1; +} + +/////////////////////////////////////// + +CreateProjectCommand::CreateProjectCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectId), "<projectid>"); + m_Options.add_option("", "", "rootdir", "Absolute path to root directory", cxxopts::value(m_RootDir), "<root>"); + m_Options.add_option("", "", "enginedir", "Absolute path to engine root directory", cxxopts::value(m_EngineRootDir), "<engineroot>"); + m_Options.add_option("", "", "projectdir", "Absolute path to project directory", cxxopts::value(m_ProjectRootDir), "<projectroot>"); + m_Options.add_option("", "", "projectfile", "Absolute path to .uproject file", cxxopts::value(m_ProjectFile), "<projectfile>"); + m_Options.add_option("", "f", "force-update", "Force update of existing project", cxxopts::value(m_ForceUpdate), "<force-update>"); + m_Options.parse_positional({"project", "rootdir", "enginedir", "projectdir", "projectfile"}); +} + +CreateProjectCommand::~CreateProjectCommand() = default; + +int +CreateProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + using namespace std::literals; + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectId.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + HttpClient Http(m_HostName); + + std::string Url = fmt::format("/prj/{}", m_ProjectId); + + if (!m_ForceUpdate) + { + if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) + { + ZEN_CONSOLE("Project already exists.\n{}", Result.ToText()); + return 1; + } + } + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer.AddString("id"sv, m_ProjectId); + Writer.AddString("root"sv, m_RootDir); + Writer.AddString("engine"sv, m_EngineRootDir); + Writer.AddString("project"sv, m_ProjectRootDir); + Writer.AddString("projectfile"sv, m_ProjectFile); + }); + if (HttpClient::Response Result = m_ForceUpdate ? Http.Put(Url, Payload, HttpClient::Accept(ZenContentType::kText)) + : Http.Post(Url, Payload, HttpClient::Accept(ZenContentType::kText))) + { + ZEN_CONSOLE("{}", Result); + return 0; + } + else + { + Result.ThrowError("failed to create project"sv); + return 1; + } +} + +/////////////////////////////////////// + +DeleteProjectCommand::DeleteProjectCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectId), "<projectid>"); +} + +DeleteProjectCommand::~DeleteProjectCommand() = default; + +int +DeleteProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + using namespace std::literals; + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectId.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + HttpClient Http(m_HostName); + + std::string Url = fmt::format("/prj/{}", m_ProjectId); + + if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)); !Result) + { + Result.ThrowError("failed deleting project"sv); + return 1; + } + + if (HttpClient::Response Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kText))) + { + ZEN_CONSOLE("{}", Result); + return 0; + } + else + { + Result.ThrowError("failed deleting project"sv); + return 1; + } +} + +/////////////////////////////////////// + +CreateOplogCommand::CreateOplogCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectId), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogId), "<oplogid>"); + m_Options.add_option("", "", "gcpath", "Absolute path to oplog lifetime marker file", cxxopts::value(m_GcPath), "<path>"); + m_Options.add_option("", "f", "force-update", "Force update of existing oplog", cxxopts::value(m_ForceUpdate), "<force-update>"); + m_Options.parse_positional({"project", "oplog", "gcpath"}); +} + +CreateOplogCommand::~CreateOplogCommand() = default; + +int +CreateOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + using namespace std::literals; + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectId.empty()) + { + throw OptionParseException("project name must be specified"); + } + + if (m_OplogId.empty()) + { + throw OptionParseException("oplog name must be specified"); + } + + HttpClient Http(m_HostName); + + std::string Url = fmt::format("/prj/{}/oplog/{}", m_ProjectId, m_OplogId); + if (!m_ForceUpdate) + { + if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) + { + ZEN_CONSOLE("Oplog already exists.\n{}", Result.ToText()); + return 1; + } + } + + IoBuffer OplogPayload; + if (!m_GcPath.empty()) + { + OplogPayload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("gcpath"sv, m_GcPath); }); + } + + if (HttpClient::Response Result = m_ForceUpdate ? Http.Put(Url, OplogPayload, HttpClient::Accept(ZenContentType::kText)) + : Http.Post(Url, OplogPayload, HttpClient::Accept(ZenContentType::kText))) + { + ZEN_CONSOLE("{}", Result); + return 0; + } + else + { + Result.ThrowError("failed to create oplog"sv); + return 1; + } +} + +/////////////////////////////////////// + +DeleteOplogCommand::DeleteOplogCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectId), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogId), "<oplogid>"); + m_Options.parse_positional({"project", "oplog", "gcpath"}); +} + +DeleteOplogCommand::~DeleteOplogCommand() = default; + +int +DeleteOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + using namespace std::literals; + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectId.empty()) + { + throw OptionParseException("project name must be specified"); + } + + if (m_OplogId.empty()) + { + throw OptionParseException("oplog name must be specified"); + } + + HttpClient Http(m_HostName); + std::string Url = fmt::format("/prj/{}/oplog/{}", m_ProjectId, m_OplogId); + + if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)); !Result) + { + Result.ThrowError("failed deleting oplog"sv); + return 1; + } + + if (HttpClient::Response Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kText))) + { + ZEN_CONSOLE("{}", Result); + return 0; + } + else + { + Result.ThrowError("failed deleting oplog"sv); + return 1; + } +} + +/////////////////////////////////////// + +ExportOplogCommand::ExportOplogCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.add_option("", "", "maxblocksize", "Max size for bundled attachments", cxxopts::value(m_MaxBlockSize), "<blocksize>"); + m_Options.add_option("", + "", + "maxchunkembedsize", + "Max size for attachment to be bundled", + cxxopts::value(m_MaxChunkEmbedSize), + "<chunksize>"); + m_Options.add_option("", + "", + "embedloosefiles", + "Export additional files referenced by path as attachments", + cxxopts::value(m_EmbedLooseFiles), + "<embedloosefiles>"); + m_Options.add_option("", "f", "force", "Force export of all attachments", cxxopts::value(m_Force), "<force>"); + m_Options.add_option("", + "", + "disableblocks", + "Disable block creation and save all attachments individually (applies to file and cloud target)", + cxxopts::value(m_DisableBlocks), + "<disable>"); + m_Options.add_option("", "a", "async", "Trigger export but don't wait for completion", cxxopts::value(m_Async), "<async>"); + + m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); + m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); + m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>"); + m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>"); + m_Options.add_option("cloud", + "", + "basekey", + "Optional Base Cloud Storage key for incremental export", + cxxopts::value(m_BaseCloudKey), + "<key>"); + m_Options + .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>"); + m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>"); + m_Options.add_option("cloud", + "", + "access-token-env", + "Name of environment variable that holds the cloud Storage access token", + cxxopts::value(m_CloudAccessTokenEnv)->default_value(DefaultCloudAccessTokenEnvVariableName), + "<envvariable>"); + m_Options.add_option("cloud", + "", + "assume-http2", + "Assume that the cloud endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake", + cxxopts::value(m_CloudAssumeHttp2), + "<assumehttp2>"); + m_Options.add_option("cloud", + "", + "disabletempblocks", + "Disable temp block creation and upload blocks without waiting for oplog container to be uploaded", + cxxopts::value(m_CloudDisableTempBlocks), + "<disable>"); + + m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>"); + m_Options.add_option("zen", "", "target-project", "Zen target project name", cxxopts::value(m_ZenProjectName), "<targetprojectid>"); + m_Options.add_option("zen", "", "target-oplog", "Zen target oplog name", cxxopts::value(m_ZenOplogName), "<targetoplogid>"); + m_Options.add_option("zen", "", "clean", "Delete existing target Zen oplog", cxxopts::value(m_ZenClean), "<clean>"); + + m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>"); + m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>"); + m_Options.add_option("file", + "", + "basename", + "Local base file name for incremental oplog export", + cxxopts::value(m_BaseFileName), + "<filename>"); + m_Options.add_option("file", + "", + "forcetempblocks", + "Force creation of temp attachment blocks", + cxxopts::value(m_FileForceEnableTempBlocks), + "<forcetempblocks>"); + + m_Options.parse_positional({"project", "oplog"}); +} + +ExportOplogCommand::~ExportOplogCommand() +{ +} + +int +ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectName.empty()) + { + throw OptionParseException("project name must be specified"); + } + + if (m_OplogName.empty()) + { + throw OptionParseException("oplog identifier must be specified"); + } + + size_t TargetCount = 0; + TargetCount += m_CloudUrl.empty() ? 0 : 1; + TargetCount += m_ZenUrl.empty() ? 0 : 1; + TargetCount += m_FileDirectoryPath.empty() ? 0 : 1; + if (TargetCount != 1) + { + if (TargetCount == 0) + { + throw OptionParseException("an export target must be specified"); + } + else + { + throw OptionParseException("a single export target must be specified"); + } + } + + if (!m_CloudUrl.empty()) + { + if (m_CloudNamespace.empty() || m_CloudBucket.empty()) + { + ZEN_ERROR("Options for cloud target are missing"); + ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str()); + return 1; + } + if (m_CloudKey.empty()) + { + std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket); + IoHash Key = IoHash::HashBuffer(KeyString.data(), KeyString.size()); + m_CloudKey = Key.ToHexString(); + ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey); + } + } + + if (!m_ZenUrl.empty()) + { + if (m_ZenProjectName.empty()) + { + m_ZenProjectName = m_ProjectName; + ZEN_WARN("Using default zen target project id '{}'", m_ZenProjectName); + } + if (m_ZenOplogName.empty()) + { + m_ZenOplogName = m_OplogName; + ZEN_WARN("Using default zen target oplog id '{}'", m_ZenOplogName); + } + + std::string TargetUrlBase = m_ZenUrl; + if (TargetUrlBase.find("://") == std::string::npos) + { + // Assume https URL + TargetUrlBase = fmt::format("http://{}", TargetUrlBase); + } + + HttpClient Http(TargetUrlBase); + std::string Url = fmt::format("/prj/{}/oplog/{}", m_ZenProjectName, m_ZenOplogName); + + bool CreateOplog = false; + if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) + { + if (m_ZenClean) + { + ZEN_WARN("Deleting zen remote oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName) + Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON)); + if (!Result) + { + Result.ThrowError("failed deleting existing zen remote oplog"sv); + return 1; + } + CreateOplog = true; + } + } + else if (Result.StatusCode == HttpResponseCode::NotFound) + { + CreateOplog = true; + } + else + { + Result.ThrowError("failed checking zen remote oplog"sv); + return 1; + } + + if (CreateOplog) + { + ZEN_WARN("Creating zen remote oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName); + if (HttpClient::Response Result = Http.Post(Url); !Result) + { + Result.ThrowError("failed creating zen remote oplog"sv); + return 1; + } + } + } + + if (!m_FileDirectoryPath.empty()) + { + if (m_FileName.empty()) + { + m_FileName = m_OplogName; + ZEN_WARN("Using default file name '{}'", m_FileName); + } + } + + std::string TargetDescription; + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer.AddString("method"sv, "export"sv); + Writer.BeginObject("params"sv); + { + if (m_MaxBlockSize != 0) + { + Writer.AddInteger("maxblocksize"sv, m_MaxBlockSize); + } + if (m_MaxChunkEmbedSize != 0) + { + Writer.AddInteger("maxchunkembedsize"sv, m_MaxChunkEmbedSize); + } + if (m_EmbedLooseFiles) + { + Writer.AddBool("embedloosefiles"sv, true); + } + if (m_Force) + { + Writer.AddBool("force"sv, true); + } + Writer.AddBool("async"sv, true); + if (!m_FileDirectoryPath.empty()) + { + Writer.BeginObject("file"sv); + { + Writer.AddString("path"sv, m_FileDirectoryPath); + Writer.AddString("name"sv, m_FileName); + if (!m_BaseFileName.empty()) + { + Writer.AddString("basename"sv, m_BaseFileName); + } + if (m_DisableBlocks) + { + Writer.AddBool("disableblocks"sv, true); + } + if (m_FileForceEnableTempBlocks) + { + Writer.AddBool("enabletempblocks"sv, true); + } + } + Writer.EndObject(); // "file" + TargetDescription = fmt::format("[file] {}/{}{}{}", + m_FileDirectoryPath, + m_FileName, + m_BaseFileName.empty() ? "" : " Base: ", + m_BaseFileName); + } + if (!m_CloudUrl.empty()) + { + Writer.BeginObject("cloud"sv); + { + Writer.AddString("url"sv, m_CloudUrl); + Writer.AddString("namespace"sv, m_CloudNamespace); + Writer.AddString("bucket"sv, m_CloudBucket); + Writer.AddString("key"sv, m_CloudKey); + if (!m_BaseCloudKey.empty()) + { + Writer.AddString("basekey"sv, m_BaseCloudKey); + } + if (!m_CloudOpenIdProvider.empty()) + { + Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider); + } + if (!m_CloudAccessToken.empty()) + { + Writer.AddString("access-token"sv, m_CloudAccessToken); + } + if (!m_CloudAccessTokenEnv.empty()) + { + std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_CloudAccessTokenEnv); + + if (!ResolvedCloudAccessTokenEnv.empty()) + { + Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv); + } + else + { + Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv); + } + } + if (m_CloudAssumeHttp2) + { + Writer.AddBool("assumehttp2"sv, true); + } + if (m_DisableBlocks) + { + Writer.AddBool("disableblocks"sv, true); + } + if (m_CloudDisableTempBlocks) + { + Writer.AddBool("disabletempblocks"sv, true); + } + } + Writer.EndObject(); // "cloud" + TargetDescription = fmt::format("[cloud] {}/{}/{}/{}{}{}", + m_CloudUrl, + m_CloudNamespace, + m_CloudBucket, + m_CloudKey, + m_BaseCloudKey.empty() ? "" : " Base: ", + m_BaseCloudKey); + } + if (!m_ZenUrl.empty()) + { + Writer.BeginObject("zen"sv); + { + Writer.AddString("url"sv, m_ZenUrl); + Writer.AddString("project"sv, m_ZenProjectName); + Writer.AddString("oplog"sv, m_ZenOplogName); + } + Writer.EndObject(); // "zen" + + TargetDescription = fmt::format("[zen] {}/{}/{}", m_ZenUrl, m_ZenProjectName, m_ZenOplogName); + } + } + Writer.EndObject(); // "params" + }); + + ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription); + + HttpClient Http(m_HostName); + if (m_Async) + { + if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + HttpClient::Accept(ZenContentType::kJSON)); + Result) + { + ZEN_CONSOLE("{}", Result.AsText()); + } + else + { + Result.ThrowError("failed requesting loading oplog export"sv); + return 1; + } + } + else + { + AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload)); + } + return 0; +} + +//////////////////////////// + +ImportOplogCommand::ImportOplogCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.add_option("", "", "maxblocksize", "Max size for bundled attachments", cxxopts::value(m_MaxBlockSize), "<blocksize>"); + m_Options.add_option("", + "", + "maxchunkembedsize", + "Max size for attachment to be bundled", + cxxopts::value(m_MaxChunkEmbedSize), + "<chunksize>"); + m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>"); + m_Options.add_option("", "a", "async", "Trigger import but don't wait for completion", cxxopts::value(m_Async), "<async>"); + + m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); + m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); + m_Options.add_option("cloud", "", "bucket", "Cloud Storage bucket", cxxopts::value(m_CloudBucket), "<bucket>"); + m_Options.add_option("cloud", "", "key", "Cloud Storage key", cxxopts::value(m_CloudKey), "<key>"); + m_Options + .add_option("cloud", "", "openid-provider", "Cloud Storage openid provider", cxxopts::value(m_CloudOpenIdProvider), "<provider>"); + m_Options.add_option("cloud", "", "access-token", "Cloud Storage access token", cxxopts::value(m_CloudAccessToken), "<accesstoken>"); + m_Options.add_option("cloud", + "", + "access-token-env", + "Name of environment variable that holds the cloud Storage access token", + cxxopts::value(m_CloudAccessTokenEnv)->default_value(DefaultCloudAccessTokenEnvVariableName), + "<envvariable>"); + m_Options.add_option("cloud", + "", + "assume-http2", + "Assume that the cloud endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake", + cxxopts::value(m_CloudAssumeHttp2), + "<assumehttp2>"); + + m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>"); + m_Options.add_option("zen", "", "source-project", "Zen source project name", cxxopts::value(m_ZenProjectName), "<sourceprojectid>"); + m_Options.add_option("zen", "", "source-oplog", "Zen source oplog name", cxxopts::value(m_ZenOplogName), "<sourceoplogid>"); + m_Options.add_option("zen", "", "clean", "Delete existing target Zen oplog", cxxopts::value(m_ZenClean), "<clean>"); + + m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>"); + m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>"); + + m_Options.parse_positional({"project", "oplog"}); +} + +ImportOplogCommand::~ImportOplogCommand() +{ +} + +int +ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_OplogName.empty()) + { + ZEN_ERROR("Oplog name must be given"); + return 1; + } + + size_t TargetCount = 0; + TargetCount += m_CloudUrl.empty() ? 0 : 1; + TargetCount += m_ZenUrl.empty() ? 0 : 1; + TargetCount += m_FileDirectoryPath.empty() ? 0 : 1; + if (TargetCount != 1) + { + ZEN_ERROR("Provide one source only"); + ZEN_CONSOLE("{}", m_Options.help({""}).c_str()); + return 1; + } + + if (!m_CloudUrl.empty()) + { + if (m_CloudNamespace.empty() || m_CloudBucket.empty()) + { + ZEN_ERROR("Options for cloud source are missing"); + ZEN_CONSOLE("{}", m_Options.help({"cloud"}).c_str()); + return 1; + } + if (m_CloudKey.empty()) + { + std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket); + IoHash Key = IoHash::HashBuffer(KeyString.data(), KeyString.size()); + m_CloudKey = Key.ToHexString(); + ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey); + } + } + + if (!m_ZenUrl.empty()) + { + if (m_ZenProjectName.empty()) + { + m_ZenProjectName = m_ProjectName; + ZEN_WARN("Using default zen target project id '{}'", m_ZenProjectName); + } + if (m_ZenOplogName.empty()) + { + m_ZenOplogName = m_OplogName; + ZEN_WARN("Using default zen target oplog id '{}'", m_ZenOplogName); + } + } + + if (!m_FileDirectoryPath.empty()) + { + if (m_FileName.empty()) + { + m_FileName = m_OplogName; + ZEN_WARN("Using auto generated file name '{}'", m_FileName); + } + } + + HttpClient Http(m_HostName); + std::string Url = fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName); + + bool CreateOplog = false; + if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) + { + if (m_ZenClean) + { + ZEN_WARN("Deleting oplog '{}/{}'", m_ProjectName, m_OplogName) + Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON)); + if (!Result) + { + Result.ThrowError("failed deleting existing oplog"sv); + return 1; + } + CreateOplog = true; + } + } + else if (Result.StatusCode == HttpResponseCode::NotFound) + { + CreateOplog = true; + } + else + { + Result.ThrowError("failed checking oplog"sv); + return 1; + } + + if (CreateOplog) + { + ZEN_WARN("Creating oplog '{}/{}'", m_ProjectName, m_OplogName); + if (HttpClient::Response Result = Http.Post(Url); !Result) + { + Result.ThrowError("failed creating oplog"sv); + return 1; + } + } + + std::string SourceDescription; + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + Writer.AddString("method"sv, "import"sv); + Writer.BeginObject("params"sv); + { + if (m_Force) + { + Writer.AddBool("force"sv, true); + } + if (!m_FileDirectoryPath.empty()) + { + Writer.BeginObject("file"sv); + { + Writer.AddString("path"sv, m_FileDirectoryPath); + Writer.AddString("name"sv, m_FileName); + } + Writer.EndObject(); // "file" + SourceDescription = fmt::format("[file] {}/{}", m_FileDirectoryPath, m_FileName); + } + if (!m_CloudUrl.empty()) + { + Writer.BeginObject("cloud"sv); + { + Writer.AddString("url"sv, m_CloudUrl); + Writer.AddString("namespace"sv, m_CloudNamespace); + Writer.AddString("bucket"sv, m_CloudBucket); + Writer.AddString("key"sv, m_CloudKey); + if (!m_CloudOpenIdProvider.empty()) + { + Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider); + } + if (!m_CloudAccessToken.empty()) + { + Writer.AddString("access-token"sv, m_CloudAccessToken); + } + if (!m_CloudAccessTokenEnv.empty()) + { + std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_CloudAccessTokenEnv); + + if (!ResolvedCloudAccessTokenEnv.empty()) + { + Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv); + } + else + { + Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv); + } + } + if (m_CloudAssumeHttp2) + { + Writer.AddBool("assumehttp2"sv, true); + } + } + Writer.EndObject(); // "cloud" + SourceDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey); + } + if (!m_ZenUrl.empty()) + { + Writer.BeginObject("zen"sv); + { + Writer.AddString("url"sv, m_ZenUrl); + Writer.AddString("project"sv, m_ZenProjectName); + Writer.AddString("oplog"sv, m_ZenOplogName); + } + Writer.EndObject(); // "zen" + SourceDescription = fmt::format("[zen] {}", m_ZenUrl); + } + } + Writer.EndObject(); // "params" + }); + + ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName); + + if (m_Async) + { + if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), + std::move(Payload), + HttpClient::Accept(ZenContentType::kJSON)); + Result) + { + ZEN_CONSOLE("{}", Result.AsText()); + } + else + { + Result.ThrowError("failed requesting loading oplog import"sv); + return 1; + } + } + else + { + AsyncPost(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload)); + } + return 0; +} + +//////////////////////////// + +SnapshotOplogCommand::SnapshotOplogCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + + m_Options.parse_positional({"project", "oplog"}); +} + +SnapshotOplogCommand::~SnapshotOplogCommand() +{ +} + +int +SnapshotOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_OplogName.empty()) + { + ZEN_ERROR("Oplog name must be given"); + return 1; + } + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); }); + + HttpClient Http(m_HostName); + + ZEN_CONSOLE("Snapshotting oplog '{}/{}' to {}", m_ProjectName, m_OplogName, m_HostName); + if (HttpClient::Response Result = + Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload, HttpClient::Accept(ZenContentType::kJSON))) + { + ZEN_CONSOLE("{}", Result); + return 0; + } + else + { + Result.ThrowError("failed to create project"sv); + return 1; + } +} + +//////////////////////////// + +ProjectStatsCommand::ProjectStatsCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); +} + +ProjectStatsCommand::~ProjectStatsCommand() +{ +} + +int +ProjectStatsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + HttpClient Http(m_HostName); + if (HttpClient::Response Result = Http.Get("/stats/prj", HttpClient::Accept(ZenContentType::kJSON))) + { + ZEN_CONSOLE("{}", Result.AsText()); + return 0; + } + else + { + Result.ThrowError("failed to get project stats"sv); + return 1; + } +} + +//////////////////////////// + +ProjectDetailsCommand::ProjectDetailsCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "c", "csv", "Output in CSV format (default is JSon)", cxxopts::value(m_CSV), "<csv>"); + m_Options.add_option("", "d", "details", "Detailed info on oplog", cxxopts::value(m_Details), "<details>"); + m_Options.add_option("", "o", "opdetails", "Details info on oplog body", cxxopts::value(m_OpDetails), "<opdetails>"); + m_Options.add_option("", "p", "project", "Project name to get info from", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "l", "oplog", "Oplog name to get info from", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.add_option("", "i", "opid", "Oid of a specific op info for", cxxopts::value(m_OpId), "<opid>"); + m_Options.add_option("", + "a", + "attachmentdetails", + "Get detailed information about attachments", + cxxopts::value(m_AttachmentDetails), + "<attachmentdetails>"); +} + +ProjectDetailsCommand::~ProjectDetailsCommand() +{ +} + +int +ProjectDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (!m_OpId.empty()) + { + if (m_ProjectName.empty() || m_OplogName.empty()) + { + ZEN_ERROR("Provide project and oplog name"); + ZEN_CONSOLE("{}", m_Options.help({""}).c_str()); + return 1; + } + } + else if (!m_OplogName.empty()) + { + if (m_ProjectName.empty()) + { + ZEN_ERROR("Provide project name"); + ZEN_CONSOLE("{}", m_Options.help({""}).c_str()); + return 1; + } + } + + HttpClient Http(m_HostName); + + ExtendableStringBuilder<128> Url; + Url.Append("/prj/details$"); + if (!m_ProjectName.empty()) + { + Url.Append("/"); + Url.Append(m_ProjectName); + } + if (!m_OplogName.empty()) + { + Url.Append("/"); + Url.Append(m_OplogName); + } + if (!m_OpId.empty()) + { + Url.Append("/"); + Url.Append(m_OpId); + } + + if (HttpClient::Response Result = + Http.Get(Url, + m_CSV ? HttpClient::Accept(ZenContentType::kText) : HttpClient::Accept(ZenContentType::kJSON), + {{"opdetails", m_OpDetails ? "true" : "false"}, + {"details", m_Details ? "true" : "false"}, + {"attachmentdetails", m_AttachmentDetails ? "true" : "false"}, + {"csv", m_CSV ? "true" : "false"}})) + { + ZEN_CONSOLE("{}", Result.AsText()); + return 0; + } + else + { + Result.ThrowError("failed to get project details"sv); + return 1; + } +} + +//////////////////////////// + +OplogMirrorCommand::OplogMirrorCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "p", "project", "Project name to get info from", cxxopts::value(m_ProjectName), "<projectid>"); + m_Options.add_option("", "l", "oplog", "Oplog name to get info from", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.add_option("", "t", "target", "Target directory for mirror", cxxopts::value(m_MirrorRootPath), "<path>"); + + m_Options.parse_positional({"project", "oplog", "target"}); + m_Options.positional_help("[<projectid> <oplogid> <target>]"); +} + +OplogMirrorCommand::~OplogMirrorCommand() +{ +} + +int +OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + if (m_ProjectName.empty()) + { + throw OptionParseException("a project must be specified"); + } + + if (m_OplogName.empty()) + { + throw OptionParseException("an oplog must be specified"); + } + + if (m_MirrorRootPath.empty()) + { + throw OptionParseException("a target path must be specified"); + } + + ZEN_CONSOLE("Emitting file data from oplog '{}/{}' to '{}'", m_ProjectName, m_OplogName, m_MirrorRootPath); + + HttpClient Http(m_HostName); + + if (HttpClient::Response Result = Http.Get(fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName))) + { + // The info requested is not really used at this moment, we just use the probe to be able to provide + // better diagnostics up front + } + else + { + Result.ThrowError("oplog info fetch failed"sv); + + return 1; + } + + // Emit file data to target directory + + std::filesystem::path RootPath{m_MirrorRootPath}; + CreateDirectories(RootPath); + + std::filesystem::path TmpPath = RootPath / ".tmp"; + CreateDirectories(TmpPath); + + std::atomic_int64_t FileCount = 0; + int OplogEntryCount = 0; + + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + Latch WorkRemaining(1); + + std::unordered_set<std::string> FileNames; + + auto EmitFilesForDataArray = [&](CbArrayView DataArray) { + for (auto DataIter : DataArray) + { + if (CbObjectView Data = DataIter.AsObjectView()) + { + std::string FileName = std::string(Data["filename"sv].AsString()); + Oid ChunkId = Data["id"sv].AsObjectId(); + if (!FileNames.insert(FileName).second) + { + continue; + } + WorkRemaining.AddCount(1); + WorkerPool.ScheduleWork([this, &RootPath, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining]() { + auto _ = MakeGuard([&WorkRemaining]() { WorkRemaining.CountDown(); }); + if (HttpClient::Response ChunkResponse = + Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath)) + { + IoBuffer ChunkData = ChunkResponse.ResponsePayload; + std::filesystem::path TargetPath = RootPath / FileName; + if (!MoveToFile(TargetPath, ChunkData)) + { + WriteFile(TargetPath, ChunkData); + } + ++FileCount; + } + else + { + ZEN_CONSOLE("Unable to fetch '{}' (chunk {}). Reason: '{}'", FileName, ChunkId, ChunkResponse.ErrorMessage(""sv)); + } + }); + } + } + }; + + if (HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries"sv, m_ProjectName, m_OplogName))) + { + if (CbObject ResponseObject = Response.AsObject()) + { + for (auto EntryIter : ResponseObject["entries"sv]) + { + CbObjectView Entry = EntryIter.AsObjectView(); + + EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView()); + EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView()); + + ++OplogEntryCount; + } + } + else + { + ZEN_ERROR("unknown format response to oplog entries request"); + } + } + else + { + Response.ThrowError("oplog entries fetch failed"); + + return 1; + } + WorkRemaining.CountDown(); + WorkRemaining.Wait(); + + std::filesystem::remove_all(TmpPath); + + ZEN_CONSOLE("mirrored {} files from {} oplog entries successfully", FileCount.load(), OplogEntryCount); + + return 0; +} + +} // namespace zen |