aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-08 09:14:17 -0400
committerGitHub <[email protected]>2023-09-08 15:14:17 +0200
commit80822c357e1dc6ddc98fa7ecd8e973faa9c81577 (patch)
tree5471e0d1f0f800df745e16f53bed9da41e805170
parentExtend http client (#387) (diff)
downloadzen-80822c357e1dc6ddc98fa7ecd8e973faa9c81577.tar.xz
zen-80822c357e1dc6ddc98fa7ecd8e973faa9c81577.zip
multithread file realization in oplog-mirror (#388)
convert project store commands to use http client use MakeCbObjectPayload everywhere in project store commands
-rw-r--r--src/zen/cmds/projectstore.cpp916
-rw-r--r--src/zen/cmds/projectstore.h4
-rw-r--r--src/zen/zen.cpp2
3 files changed, 464 insertions, 458 deletions
diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp
index 5814b9671..fb3db1fdd 100644
--- a/src/zen/cmds/projectstore.cpp
+++ b/src/zen/cmds/projectstore.cpp
@@ -6,7 +6,10 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/formatters.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/httpcommon.h>
@@ -14,18 +17,30 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
ZEN_THIRD_PARTY_INCLUDES_END
+namespace zen {
+
namespace {
-using namespace std::literals;
+ using namespace std::literals;
-const std::string DefaultCloudAccessTokenEnvVariableName(
+ const std::string DefaultCloudAccessTokenEnvVariableName(
#if ZEN_PLATFORM_WINDOWS
- "UE-CloudDataCacheAccessToken"sv
+ "UE-CloudDataCacheAccessToken"sv
#endif
#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- "UE_CloudDataCacheAccessToken"sv
+ "UE_CloudDataCacheAccessToken"sv
#endif
-);
+
+ );
+
+ IoBuffer MakeCbObjectPayload(std::function<void(CbObjectWriter& Writer)> WriteCB)
+ {
+ CbObjectWriter Writer;
+ WriteCB(Writer);
+ IoBuffer Payload = Writer.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+ return Payload;
+ };
} // namespace
@@ -59,44 +74,43 @@ DropProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
if (m_ProjectName.empty())
{
- throw zen::OptionParseException("Drop command requires a project");
+ throw OptionParseException("Drop command requires a project");
}
- cpr::Session Session;
+ HttpClient Http(m_HostName);
if (m_OplogName.empty())
{
ZEN_CONSOLE("Dropping project '{}' from '{}'", m_ProjectName, m_HostName);
- Session.SetUrl({fmt::format("{}/prj/{}", m_HostName, m_ProjectName)});
+ if (HttpClient::Response Result = Http.Delete(fmt::format("/prj/{}", m_ProjectName)))
+ {
+ ZEN_CONSOLE("{}", Result);
+ }
+ else
+ {
+ Result.ThrowError("delete project failed"sv);
+ return 1;
+ }
}
else
{
ZEN_CONSOLE("Dropping oplog '{}/{}' from '{}'", m_ProjectName, m_OplogName, m_HostName);
- Session.SetUrl({fmt::format("{}/prj/{}/oplog/{}", m_HostName, m_ProjectName, m_OplogName)});
- }
-
- cpr::Response Result = Session.Delete();
-
- if (zen::IsHttpSuccessCode(Result.status_code))
- {
- ZEN_CONSOLE("OK: drop succeeded");
- return 0;
- }
-
- if (Result.status_code)
- {
- ZEN_ERROR("Drop failed: {}: {} ({})", Result.status_code, Result.reason, Result.text);
- }
- else
- {
- ZEN_ERROR("Drop failed: {}", Result.error.message);
+ if (HttpClient::Response Result = Http.Delete(fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName)))
+ {
+ ZEN_CONSOLE("{}", Result);
+ }
+ else
+ {
+ Result.ThrowError("delete oplog failed"sv);
+ return 1;
+ }
}
- return 1;
+ return 0;
}
///////////////////////////////////////
@@ -129,54 +143,42 @@ ProjectInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
if (!m_OplogName.empty() && m_ProjectName.empty())
{
- throw zen::OptionParseException("an oplog can't be specified without also specifying a project");
+ throw OptionParseException("an oplog can't be specified without also specifying a project");
}
- std::string Url;
+ HttpClient Http(m_HostName);
+ std::string Url;
if (m_ProjectName.empty())
{
- Url = fmt::format("{}/prj", m_HostName);
+ Url = "/prj";
ZEN_CONSOLE("Info from '{}'", Url);
}
else if (m_OplogName.empty())
{
- Url = fmt::format("{}/prj/{}", m_HostName, m_ProjectName);
- ZEN_CONSOLE("Info on project '{}' from '{}'", m_ProjectName, Url);
+ Url = fmt::format("/prj/{}", m_ProjectName);
+ ZEN_CONSOLE("Info on project '{}' from '{}{}'", m_ProjectName, m_HostName, Url);
}
else
{
- Url = fmt::format("{}/prj/{}/oplog/{}", m_HostName, m_ProjectName, m_OplogName);
- ZEN_CONSOLE("Info on oplog '{}/{}' from '{}'", m_ProjectName, m_OplogName, Url);
+ Url = fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName);
+ ZEN_CONSOLE("Info on oplog '{}/{}' from '{}{}'", m_ProjectName, m_OplogName, m_HostName, Url);
}
- cpr::Session Session;
- Session.SetHeader(cpr::Header{{"Accept", "application/json"}});
- Session.SetUrl(Url);
-
- cpr::Response Result = Session.Get();
-
- if (zen::IsHttpSuccessCode(Result.status_code))
+ if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)))
{
- ZEN_CONSOLE("{}", Result.text);
-
- return 0;
- }
-
- if (Result.status_code)
- {
- ZEN_ERROR("Info failed: {}: {} ({})", Result.status_code, Result.reason, Result.text);
+ ZEN_CONSOLE("{}", Result.ToText());
}
else
{
- ZEN_ERROR("Info failed: {}", Result.error.message);
+ Result.ThrowError("failed to fetch info"sv);
+ return 1;
}
-
return 1;
}
@@ -213,42 +215,46 @@ CreateProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
- cpr::Session Session;
- Session.SetHeader(cpr::Header{{"Accept", "application/json"}});
-
if (m_ProjectId.empty())
{
ZEN_ERROR("Project name must be given");
return 1;
}
- Session.SetUrl({fmt::format("{}/prj/{}", m_HostName, m_ProjectId)});
- cpr::Response Response = Session.Get();
- if (zen::IsHttpSuccessCode(Response.status_code) && !m_ForceUpdate)
+ HttpClient Http(m_HostName);
+
+ std::string Url = fmt::format("/prj/{}", m_ProjectId);
+
+ if (!m_ForceUpdate)
{
- ZEN_CONSOLE("Project already exists.\n{}", Response.text);
- return 1;
+ if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)))
+ {
+ ZEN_CONSOLE("Project already exists.\n{}", Result.ToText());
+ return 1;
+ }
}
- if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound) || m_ForceUpdate)
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer.AddString("id"sv, m_ProjectId);
+ Writer.AddString("root"sv, m_RootDir);
+ Writer.AddString("engine"sv, m_EngineRootDir);
+ Writer.AddString("project"sv, m_ProjectRootDir);
+ Writer.AddString("projectfile"sv, m_ProjectFile);
+ });
+ if (HttpClient::Response Result = m_ForceUpdate ? Http.Put(Url, Payload, HttpClient::Accept(ZenContentType::kText))
+ : Http.Post(Url, Payload, HttpClient::Accept(ZenContentType::kText)))
{
- zen::CbObjectWriter Project;
- Project.AddString("id"sv, m_ProjectId);
- Project.AddString("root"sv, m_RootDir);
- Project.AddString("engine"sv, m_EngineRootDir);
- Project.AddString("project"sv, m_ProjectRootDir);
- Project.AddString("projectfile"sv, m_ProjectFile);
- zen::IoBuffer ProjectPayload = Project.Save().GetBuffer().AsIoBuffer();
- Session.SetBody(cpr::Body{(const char*)ProjectPayload.GetData(), ProjectPayload.GetSize()});
- Session.SetHeader(cpr::Header{{"Accept", "text"}});
- Response = m_ForceUpdate ? Session.Post() : Session.Put();
+ ZEN_CONSOLE("{}", Result);
+ return 0;
+ }
+ else
+ {
+ Result.ThrowError("failed to create project"sv);
+ return 1;
}
-
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
}
///////////////////////////////////////
@@ -278,36 +284,35 @@ DeleteProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
- cpr::Session Session;
- Session.SetHeader(cpr::Header{{"Accept", "application/json"}});
-
if (m_ProjectId.empty())
{
ZEN_ERROR("Project name must be given");
return 1;
}
- Session.SetUrl({fmt::format("{}/prj/{}", m_HostName, m_ProjectId)});
- cpr::Response Response = Session.Get();
- if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
+ HttpClient Http(m_HostName);
+
+ std::string Url = fmt::format("/prj/{}", m_ProjectId);
+
+ if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)); !Result)
{
- ZEN_CONSOLE("Project does not exist.\n{}", Response.text);
+ Result.ThrowError("failed deleting project"sv);
return 1;
}
- if (!zen::IsHttpSuccessCode(Response.status_code))
+
+ if (HttpClient::Response Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kText)))
+ {
+ ZEN_CONSOLE("{}", Result);
+ return 0;
+ }
+ else
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
+ Result.ThrowError("failed deleting project"sv);
return 1;
}
-
- Session.SetHeader(cpr::Header{{"Accept", "text"}});
- Response = Session.Delete();
-
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
}
///////////////////////////////////////
@@ -341,48 +346,48 @@ CreateOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
- cpr::Session Session;
- Session.SetHeader(cpr::Header{{"Accept", "application/json"}});
-
if (m_ProjectId.empty())
{
- throw zen::OptionParseException("project name must be specified");
+ throw OptionParseException("project name must be specified");
}
if (m_OplogId.empty())
{
- throw zen::OptionParseException("oplog name must be specified");
+ throw OptionParseException("oplog name must be specified");
}
- Session.SetUrl({fmt::format("{}/prj/{}/oplog/{}", m_HostName, m_ProjectId, m_OplogId)});
- cpr::Response Response = Session.Get();
- if (zen::IsHttpSuccessCode(Response.status_code) && !m_ForceUpdate)
- {
- ZEN_CONSOLE("Oplog already exists.\n{}", Response.text);
- return 1;
- }
+ HttpClient Http(m_HostName);
- if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound) || m_ForceUpdate)
+ std::string Url = fmt::format("/prj/{}/oplog/{}", m_ProjectId, m_OplogId);
+ if (!m_ForceUpdate)
{
- Session.SetHeader(cpr::Header{{"Accept", "text"}});
- if (!m_GcPath.empty())
+ if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)))
{
- zen::CbObjectWriter Oplog;
- Oplog.AddString("gcpath"sv, m_GcPath);
- zen::IoBuffer OplogPayload = Oplog.Save().GetBuffer().AsIoBuffer();
- Session.SetBody(cpr::Body{(const char*)OplogPayload.GetData(), OplogPayload.GetSize()});
- Session.SetHeader(cpr::Header{{"Accept", "text"}, {"Content-Type", std::string(ToString(zen::HttpContentType::kCbObject))}});
+ ZEN_CONSOLE("Oplog already exists.\n{}", Result.ToText());
+ return 1;
}
-
- Response = m_ForceUpdate ? Session.Post() : Session.Put();
}
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
+ IoBuffer OplogPayload;
+ if (!m_GcPath.empty())
+ {
+ OplogPayload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("gcpath"sv, m_GcPath); });
+ }
- return MapHttpToCommandReturnCode(Response);
+ if (HttpClient::Response Result = m_ForceUpdate ? Http.Put(Url, OplogPayload, HttpClient::Accept(ZenContentType::kText))
+ : Http.Post(Url, OplogPayload, HttpClient::Accept(ZenContentType::kText)))
+ {
+ ZEN_CONSOLE("{}", Result);
+ return 0;
+ }
+ else
+ {
+ Result.ThrowError("failed to create oplog"sv);
+ return 1;
+ }
}
///////////////////////////////////////
@@ -414,41 +419,38 @@ DeleteOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
- cpr::Session Session;
- Session.SetHeader(cpr::Header{{"Accept", "application/json"}});
-
if (m_ProjectId.empty())
{
- throw zen::OptionParseException("project name must be specified");
+ throw OptionParseException("project name must be specified");
}
if (m_OplogId.empty())
{
- throw zen::OptionParseException("oplog name must be specified");
+ throw OptionParseException("oplog name must be specified");
}
- Session.SetUrl({fmt::format("{}/prj/{}/oplog/{}", m_HostName, m_ProjectId, m_OplogId)});
- cpr::Response Response = Session.Get();
- if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
+ HttpClient Http(m_HostName);
+ std::string Url = fmt::format("/prj/{}/oplog/{}", m_ProjectId, m_OplogId);
+
+ if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)); !Result)
{
- ZEN_CONSOLE("Oplog does not exist.\n{}", Response.text);
+ Result.ThrowError("failed deleting oplog"sv);
return 1;
}
- if (!zen::IsHttpSuccessCode(Response.status_code))
+
+ if (HttpClient::Response Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kText)))
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
+ ZEN_CONSOLE("{}", Result);
+ return 0;
+ }
+ else
+ {
+ Result.ThrowError("failed deleting oplog"sv);
return 1;
}
-
- Session.SetHeader(cpr::Header{{"Accept", "text"}});
- Response = Session.Delete();
-
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
-
- return MapHttpToCommandReturnCode(Response);
}
///////////////////////////////////////
@@ -543,17 +545,17 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
if (m_ProjectName.empty())
{
- throw zen::OptionParseException("project name must be specified");
+ throw OptionParseException("project name must be specified");
}
if (m_OplogName.empty())
{
- throw zen::OptionParseException("oplog identifier must be specified");
+ throw OptionParseException("oplog identifier must be specified");
}
size_t TargetCount = 0;
@@ -564,16 +566,14 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
if (TargetCount == 0)
{
- throw zen::OptionParseException("an export target must be specified");
+ throw OptionParseException("an export target must be specified");
}
else
{
- throw zen::OptionParseException("a single export target must be specified");
+ throw OptionParseException("a single export target must be specified");
}
}
- cpr::Session Session;
-
if (!m_CloudUrl.empty())
{
if (m_CloudNamespace.empty() || m_CloudBucket.empty())
@@ -585,7 +585,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_CloudKey.empty())
{
std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket);
- zen::IoHash Key = zen::IoHash::HashBuffer(KeyString.data(), KeyString.size());
+ IoHash Key = IoHash::HashBuffer(KeyString.data(), KeyString.size());
m_CloudKey = Key.ToHexString();
ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey);
}
@@ -604,44 +604,48 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
ZEN_WARN("Using default zen target oplog id '{}'", m_ZenOplogName);
}
- std::string TargetUrlBase = fmt::format("{}/prj", m_ZenUrl);
+ std::string TargetUrlBase = m_ZenUrl;
if (TargetUrlBase.find("://") == std::string::npos)
{
// Assume https URL
TargetUrlBase = fmt::format("http://{}", TargetUrlBase);
}
- Session.SetUrl({fmt::format("{}/{}/oplog/{}", TargetUrlBase, m_ZenProjectName, m_ZenOplogName)});
- cpr::Response Response = Session.Get();
- if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
+ HttpClient Http(TargetUrlBase);
+ std::string Url = fmt::format("/prj/{}/oplog/{}", m_ZenProjectName, m_ZenOplogName);
+
+ bool CreateOplog = false;
+ if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)))
{
- ZEN_WARN("Automatically creating oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName)
- Response = Session.Post();
- if (!zen::IsHttpSuccessCode(Response.status_code))
+ if (m_ZenClean)
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+ ZEN_WARN("Deleting zen remote oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName)
+ Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON));
+ if (!Result)
+ {
+ Result.ThrowError("failed deleting existing zen remote oplog"sv);
+ return 1;
+ }
+ CreateOplog = true;
}
}
- else if (!zen::IsHttpSuccessCode(Response.status_code))
+ else if (Result.StatusCode == HttpResponseCode::NotFound)
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+ CreateOplog = true;
}
- else if (m_ZenClean)
+ else
{
- ZEN_WARN("Cleaning oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName)
- Response = Session.Delete();
- if (!zen::IsHttpSuccessCode(Response.status_code))
- {
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
- }
- Response = Session.Post();
- if (!zen::IsHttpSuccessCode(Response.status_code))
+ Result.ThrowError("failed checking zen remote oplog"sv);
+ return 1;
+ }
+
+ if (CreateOplog)
+ {
+ ZEN_WARN("Creating zen remote oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName);
+ if (HttpClient::Response Result = Http.Post(Url); !Result)
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+ Result.ThrowError("failed creating zen remote oplog"sv);
+ return 1;
}
}
}
@@ -655,116 +659,120 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
}
}
- const std::string SourceUrlBase = fmt::format("{}/prj", m_HostName);
- std::string TargetDescription;
- Session.SetUrl({fmt::format("{}/{}/oplog/{}/rpc", SourceUrlBase, m_ProjectName, m_OplogName)});
- Session.SetHeader({{"Content-Type", std::string(zen::MapContentTypeToString(zen::HttpContentType::kCbObject))}});
- zen::CbObjectWriter Writer;
- Writer.AddString("method"sv, "export"sv);
- Writer.BeginObject("params"sv);
- {
- if (m_MaxBlockSize != 0)
- {
- Writer.AddInteger("maxblocksize"sv, m_MaxBlockSize);
- }
- if (m_MaxChunkEmbedSize != 0)
- {
- Writer.AddInteger("maxchunkembedsize"sv, m_MaxChunkEmbedSize);
- }
- if (m_EmbedLooseFiles)
- {
- Writer.AddBool("embedloosefiles"sv, true);
- }
- if (m_Force)
- {
- Writer.AddBool("force"sv, true);
- }
- if (!m_FileDirectoryPath.empty())
+ std::string TargetDescription;
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer.AddString("method"sv, "export"sv);
+ Writer.BeginObject("params"sv);
{
- Writer.BeginObject("file"sv);
+ if (m_MaxBlockSize != 0)
{
- Writer.AddString("path"sv, m_FileDirectoryPath);
- Writer.AddString("name"sv, m_FileName);
- if (m_DisableBlocks)
- {
- Writer.AddBool("disableblocks"sv, true);
- }
- if (m_FileForceEnableTempBlocks)
- {
- Writer.AddBool("enabletempblocks"sv, true);
- }
+ Writer.AddInteger("maxblocksize"sv, m_MaxBlockSize);
}
- Writer.EndObject(); // "file"
- TargetDescription = fmt::format("[file] {}/{}", m_FileDirectoryPath, m_FileName);
- }
- if (!m_CloudUrl.empty())
- {
- Writer.BeginObject("cloud"sv);
+ if (m_MaxChunkEmbedSize != 0)
{
- Writer.AddString("url"sv, m_CloudUrl);
- Writer.AddString("namespace"sv, m_CloudNamespace);
- Writer.AddString("bucket"sv, m_CloudBucket);
- Writer.AddString("key"sv, m_CloudKey);
- if (!m_CloudOpenIdProvider.empty())
- {
- Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider);
- }
- if (!m_CloudAccessToken.empty())
- {
- Writer.AddString("access-token"sv, m_CloudAccessToken);
- }
- if (!m_CloudAccessTokenEnv.empty())
+ Writer.AddInteger("maxchunkembedsize"sv, m_MaxChunkEmbedSize);
+ }
+ if (m_EmbedLooseFiles)
+ {
+ Writer.AddBool("embedloosefiles"sv, true);
+ }
+ if (m_Force)
+ {
+ Writer.AddBool("force"sv, true);
+ }
+ if (!m_FileDirectoryPath.empty())
+ {
+ Writer.BeginObject("file"sv);
{
- std::string ResolvedCloudAccessTokenEnv = zen::GetEnvVariable(m_CloudAccessTokenEnv);
-
- if (!ResolvedCloudAccessTokenEnv.empty())
+ Writer.AddString("path"sv, m_FileDirectoryPath);
+ Writer.AddString("name"sv, m_FileName);
+ if (m_DisableBlocks)
{
- Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv);
+ Writer.AddBool("disableblocks"sv, true);
}
- else
+ if (m_FileForceEnableTempBlocks)
{
- Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv);
+ Writer.AddBool("enabletempblocks"sv, true);
}
}
- if (m_CloudAssumeHttp2)
- {
- Writer.AddBool("assumehttp2"sv, true);
- }
- if (m_DisableBlocks)
- {
- Writer.AddBool("disableblocks"sv, true);
- }
- if (m_CloudDisableTempBlocks)
+ Writer.EndObject(); // "file"
+ TargetDescription = fmt::format("[file] {}/{}", m_FileDirectoryPath, m_FileName);
+ }
+ if (!m_CloudUrl.empty())
+ {
+ Writer.BeginObject("cloud"sv);
{
- Writer.AddBool("disabletempblocks"sv, true);
+ Writer.AddString("url"sv, m_CloudUrl);
+ Writer.AddString("namespace"sv, m_CloudNamespace);
+ Writer.AddString("bucket"sv, m_CloudBucket);
+ Writer.AddString("key"sv, m_CloudKey);
+ if (!m_CloudOpenIdProvider.empty())
+ {
+ Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider);
+ }
+ if (!m_CloudAccessToken.empty())
+ {
+ Writer.AddString("access-token"sv, m_CloudAccessToken);
+ }
+ if (!m_CloudAccessTokenEnv.empty())
+ {
+ std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_CloudAccessTokenEnv);
+
+ if (!ResolvedCloudAccessTokenEnv.empty())
+ {
+ Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv);
+ }
+ else
+ {
+ Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv);
+ }
+ }
+ if (m_CloudAssumeHttp2)
+ {
+ Writer.AddBool("assumehttp2"sv, true);
+ }
+ if (m_DisableBlocks)
+ {
+ Writer.AddBool("disableblocks"sv, true);
+ }
+ if (m_CloudDisableTempBlocks)
+ {
+ Writer.AddBool("disabletempblocks"sv, true);
+ }
}
+ Writer.EndObject(); // "cloud"
+ TargetDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey);
}
- Writer.EndObject(); // "cloud"
- TargetDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey);
- }
- if (!m_ZenUrl.empty())
- {
- Writer.BeginObject("zen"sv);
+ if (!m_ZenUrl.empty())
{
- Writer.AddString("url"sv, m_ZenUrl);
- Writer.AddString("project"sv, m_ZenProjectName);
- Writer.AddString("oplog"sv, m_ZenOplogName);
- }
- Writer.EndObject(); // "zen"
+ Writer.BeginObject("zen"sv);
+ {
+ Writer.AddString("url"sv, m_ZenUrl);
+ Writer.AddString("project"sv, m_ZenProjectName);
+ Writer.AddString("oplog"sv, m_ZenOplogName);
+ }
+ Writer.EndObject(); // "zen"
- TargetDescription = fmt::format("[zen] {}/{}/{}", m_ZenUrl, m_ZenProjectName, m_ZenOplogName);
+ TargetDescription = fmt::format("[zen] {}/{}/{}", m_ZenUrl, m_ZenProjectName, m_ZenOplogName);
+ }
}
- }
- Writer.EndObject(); // "params"
-
- zen::BinaryWriter MemOut;
- Writer.Save(MemOut);
- Session.SetBody(cpr::Body{(const char*)MemOut.GetData(), MemOut.GetSize()});
+ Writer.EndObject(); // "params"
+ });
ZEN_CONSOLE("Saving oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, m_HostName, TargetDescription);
- cpr::Response Response = Session.Post();
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+
+ HttpClient Http(m_HostName);
+ if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload))
+ {
+ ZEN_CONSOLE("{}", Result);
+ return 0;
+ }
+ else
+ {
+ Result.ThrowError("failed to create project"sv);
+ return 1;
+ }
}
////////////////////////////
@@ -835,7 +843,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
if (m_ProjectName.empty())
@@ -861,8 +869,6 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
return 1;
}
- cpr::Session Session;
-
if (!m_CloudUrl.empty())
{
if (m_CloudNamespace.empty() || m_CloudBucket.empty())
@@ -874,7 +880,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_CloudKey.empty())
{
std::string KeyString = fmt::format("{}/{}/{}/{}", m_ProjectName, m_OplogName, m_CloudNamespace, m_CloudBucket);
- zen::IoHash Key = zen::IoHash::HashBuffer(KeyString.data(), KeyString.size());
+ IoHash Key = IoHash::HashBuffer(KeyString.data(), KeyString.size());
m_CloudKey = Key.ToHexString();
ZEN_WARN("Using auto generated cloud key '{}'", m_CloudKey);
}
@@ -903,123 +909,127 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
}
}
- const std::string TargetUrlBase = fmt::format("{}/prj", m_HostName);
- Session.SetUrl({fmt::format("{}/{}/oplog/{}", TargetUrlBase, m_ProjectName, m_OplogName)});
- cpr::Response Response = Session.Get();
- if (Response.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
+ HttpClient Http(m_HostName);
+ std::string Url = fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName);
+
+ bool CreateOplog = false;
+ if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)))
{
- ZEN_WARN("Automatically creating oplog '{}/{}'", m_ProjectName, m_OplogName)
- Response = Session.Post();
- if (!zen::IsHttpSuccessCode(Response.status_code))
+ if (m_ZenClean)
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+ ZEN_WARN("Deleting oplog '{}/{}'", m_ProjectName, m_OplogName)
+ Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON));
+ if (!Result)
+ {
+ Result.ThrowError("failed deleting existing oplog"sv);
+ return 1;
+ }
+ CreateOplog = true;
}
}
- else if (!zen::IsHttpSuccessCode(Response.status_code))
+ else if (Result.StatusCode == HttpResponseCode::NotFound)
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+ CreateOplog = true;
}
- else if (m_ZenClean)
+ else
{
- ZEN_WARN("Cleaning oplog '{}/{}'", m_ProjectName, m_OplogName)
- Response = Session.Delete();
- if (!zen::IsHttpSuccessCode(Response.status_code))
- {
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
- }
- Response = Session.Post();
- if (!zen::IsHttpSuccessCode(Response.status_code))
+ Result.ThrowError("failed checking oplog"sv);
+ return 1;
+ }
+
+ if (CreateOplog)
+ {
+ ZEN_WARN("Creating oplog '{}/{}'", m_ProjectName, m_OplogName);
+ if (HttpClient::Response Result = Http.Post(Url); !Result)
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+ Result.ThrowError("failed creating oplog"sv);
+ return 1;
}
}
std::string SourceDescription;
- Session.SetUrl(fmt::format("{}/{}/oplog/{}/rpc", TargetUrlBase, m_ProjectName, m_OplogName));
- Session.SetHeader({{"Content-Type", std::string(zen::MapContentTypeToString(zen::HttpContentType::kCbObject))}});
- zen::CbObjectWriter Writer;
- Writer.AddString("method"sv, "import"sv);
- Writer.BeginObject("params"sv);
- {
- if (m_Force)
- {
- Writer.AddBool("force"sv, true);
- }
- if (!m_FileDirectoryPath.empty())
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ Writer.AddString("method"sv, "import"sv);
+ Writer.BeginObject("params"sv);
{
- Writer.BeginObject("file"sv);
+ if (m_Force)
{
- Writer.AddString("path"sv, m_FileDirectoryPath);
- Writer.AddString("name"sv, m_FileName);
+ Writer.AddBool("force"sv, true);
}
- Writer.EndObject(); // "file"
- SourceDescription = fmt::format("[file] {}/{}", m_FileDirectoryPath, m_FileName);
- }
- if (!m_CloudUrl.empty())
- {
- Writer.BeginObject("cloud"sv);
+ if (!m_FileDirectoryPath.empty())
{
- Writer.AddString("url"sv, m_CloudUrl);
- Writer.AddString("namespace"sv, m_CloudNamespace);
- Writer.AddString("bucket"sv, m_CloudBucket);
- Writer.AddString("key"sv, m_CloudKey);
- if (!m_CloudOpenIdProvider.empty())
- {
- Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider);
- }
- if (!m_CloudAccessToken.empty())
+ Writer.BeginObject("file"sv);
{
- Writer.AddString("access-token"sv, m_CloudAccessToken);
+ Writer.AddString("path"sv, m_FileDirectoryPath);
+ Writer.AddString("name"sv, m_FileName);
}
- if (!m_CloudAccessTokenEnv.empty())
+ Writer.EndObject(); // "file"
+ SourceDescription = fmt::format("[file] {}/{}", m_FileDirectoryPath, m_FileName);
+ }
+ if (!m_CloudUrl.empty())
+ {
+ Writer.BeginObject("cloud"sv);
{
- std::string ResolvedCloudAccessTokenEnv = zen::GetEnvVariable(m_CloudAccessTokenEnv);
-
- if (!ResolvedCloudAccessTokenEnv.empty())
+ Writer.AddString("url"sv, m_CloudUrl);
+ Writer.AddString("namespace"sv, m_CloudNamespace);
+ Writer.AddString("bucket"sv, m_CloudBucket);
+ Writer.AddString("key"sv, m_CloudKey);
+ if (!m_CloudOpenIdProvider.empty())
{
- Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv);
+ Writer.AddString("openid-provider"sv, m_CloudOpenIdProvider);
}
- else
+ if (!m_CloudAccessToken.empty())
{
- Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv);
+ Writer.AddString("access-token"sv, m_CloudAccessToken);
+ }
+ if (!m_CloudAccessTokenEnv.empty())
+ {
+ std::string ResolvedCloudAccessTokenEnv = GetEnvVariable(m_CloudAccessTokenEnv);
+
+ if (!ResolvedCloudAccessTokenEnv.empty())
+ {
+ Writer.AddString("access-token"sv, ResolvedCloudAccessTokenEnv);
+ }
+ else
+ {
+ Writer.AddString("access-token-env"sv, m_CloudAccessTokenEnv);
+ }
+ }
+ if (m_CloudAssumeHttp2)
+ {
+ Writer.AddBool("assumehttp2"sv, true);
}
}
- if (m_CloudAssumeHttp2)
- {
- Writer.AddBool("assumehttp2"sv, true);
- }
+ Writer.EndObject(); // "cloud"
+ SourceDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey);
}
- Writer.EndObject(); // "cloud"
- SourceDescription = fmt::format("[cloud] {}/{}/{}/{}", m_CloudUrl, m_CloudNamespace, m_CloudBucket, m_CloudKey);
- }
- if (!m_ZenUrl.empty())
- {
- Writer.BeginObject("zen"sv);
+ if (!m_ZenUrl.empty())
{
- Writer.AddString("url"sv, m_ZenUrl);
- Writer.AddString("project"sv, m_ZenProjectName);
- Writer.AddString("oplog"sv, m_ZenOplogName);
+ Writer.BeginObject("zen"sv);
+ {
+ Writer.AddString("url"sv, m_ZenUrl);
+ Writer.AddString("project"sv, m_ZenProjectName);
+ Writer.AddString("oplog"sv, m_ZenOplogName);
+ }
+ Writer.EndObject(); // "zen"
+ SourceDescription = fmt::format("[zen] {}", m_ZenUrl);
}
- Writer.EndObject(); // "zen"
- SourceDescription = fmt::format("[zen] {}", m_ZenUrl);
}
- }
- Writer.EndObject(); // "params"
-
- zen::BinaryWriter MemOut;
- Writer.Save(MemOut);
- Session.SetBody(cpr::Body{(const char*)MemOut.GetData(), MemOut.GetSize()});
+ Writer.EndObject(); // "params"
+ });
ZEN_CONSOLE("Loading oplog '{}/{}' from '{}' to {}", m_ProjectName, m_OplogName, SourceDescription, m_HostName);
- Response = Session.Post();
-
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+ if (HttpClient::Response Result = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload))
+ {
+ ZEN_CONSOLE("{}", Result);
+ return 0;
+ }
+ else
+ {
+ Result.ThrowError("failed to create project"sv);
+ return 1;
+ }
}
////////////////////////////
@@ -1054,7 +1064,7 @@ SnapshotOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
if (m_ProjectName.empty())
@@ -1069,26 +1079,22 @@ SnapshotOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
return 1;
}
- cpr::Session Session;
-
- const std::string TargetUrlBase = fmt::format("{}/prj", m_HostName);
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
- Session.SetUrl(fmt::format("{}/{}/oplog/{}/rpc", TargetUrlBase, m_ProjectName, m_OplogName));
- Session.SetHeader(
- {{"Accept", "application/json"}, {"Content-Type", std::string(zen::MapContentTypeToString(zen::HttpContentType::kCbObject))}});
-
- zen::CbObjectWriter Writer;
- Writer.AddString("method"sv, "snapshot"sv);
-
- zen::BinaryWriter MemOut;
- Writer.Save(MemOut);
- Session.SetBody(cpr::Body{(const char*)MemOut.GetData(), MemOut.GetSize()});
+ HttpClient Http(m_HostName);
ZEN_CONSOLE("Snapshotting oplog '{}/{}' to {}", m_ProjectName, m_OplogName, m_HostName);
- cpr::Response Response = Session.Post();
-
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- return MapHttpToCommandReturnCode(Response);
+ if (HttpClient::Response Result =
+ Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), Payload, HttpClient::Accept(ZenContentType::kJSON)))
+ {
+ ZEN_CONSOLE("{}", Result);
+ return 0;
+ }
+ else
+ {
+ Result.ThrowError("failed to create project"sv);
+ return 1;
+ }
}
////////////////////////////
@@ -1117,32 +1123,20 @@ ProjectStatsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** ar
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
- cpr::Session Session;
- Session.SetUrl({fmt::format("{}/stats/prj", m_HostName)});
- Session.SetHeader(cpr::Header{{"Accept", "application/json"}});
-
- cpr::Response Result = Session.Get();
-
- if (zen::IsHttpSuccessCode(Result.status_code))
+ HttpClient Http(m_HostName);
+ if (HttpClient::Response Result = Http.Get("/stats/prj", HttpClient::Accept(ZenContentType::kJSON)))
{
- ZEN_CONSOLE("{}", Result.text);
-
+ ZEN_CONSOLE("{}", Result.AsText());
return 0;
}
-
- if (Result.status_code)
- {
- ZEN_ERROR("Info failed: {}: {} ({})", Result.status_code, Result.reason, Result.text);
- }
else
{
- ZEN_ERROR("Info failed: {}", Result.error.message);
+ Result.ThrowError("failed to get project stats"sv);
+ return 1;
}
-
- return 1;
}
////////////////////////////
@@ -1183,30 +1177,7 @@ ProjectDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
- }
-
- cpr::Session Session;
- cpr::Parameters Parameters;
- if (m_OpDetails)
- {
- Parameters.Add({"opdetails", "true"});
- }
- if (m_Details)
- {
- Parameters.Add({"details", "true"});
- }
- if (m_AttachmentDetails)
- {
- Parameters.Add({"attachmentdetails", "true"});
- }
- if (m_CSV)
- {
- Parameters.Add({"csv", "true"});
- }
- else
- {
- Session.SetHeader(cpr::Header{{"Accept", "application/json"}});
+ throw OptionParseException("unable to resolve server specification");
}
if (!m_OpId.empty())
@@ -1217,7 +1188,6 @@ ProjectDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
ZEN_CONSOLE("{}", m_Options.help({""}).c_str());
return 1;
}
- Session.SetUrl({fmt::format("{}/prj/details$/{}/{}/{}", m_HostName, m_ProjectName, m_OplogName, m_OpId)});
}
else if (!m_OplogName.empty())
{
@@ -1227,37 +1197,44 @@ ProjectDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char**
ZEN_CONSOLE("{}", m_Options.help({""}).c_str());
return 1;
}
- Session.SetUrl({fmt::format("{}/prj/details$/{}/{}", m_HostName, m_ProjectName, m_OplogName)});
}
- else if (!m_ProjectName.empty())
+
+ HttpClient Http(m_HostName);
+
+ ExtendableStringBuilder<128> Url;
+ Url.Append("/prj/details$");
+ if (!m_ProjectName.empty())
{
- Session.SetUrl({fmt::format("{}/prj/details$/{}", m_HostName, m_ProjectName)});
+ Url.Append("/");
+ Url.Append(m_ProjectName);
}
- else
+ if (!m_OplogName.empty())
{
- Session.SetUrl({fmt::format("{}/prj/details$", m_HostName)});
+ Url.Append("/");
+ Url.Append(m_OplogName);
}
- Session.SetParameters(Parameters);
-
- cpr::Response Result = Session.Get();
-
- if (zen::IsHttpSuccessCode(Result.status_code))
+ if (!m_OpId.empty())
{
- ZEN_CONSOLE("{}", Result.text);
-
- return 0;
+ Url.Append("/");
+ Url.Append(m_OpId);
}
- if (Result.status_code)
+ if (HttpClient::Response Result =
+ Http.Get(Url,
+ m_CSV ? HttpClient::Accept(ZenContentType::kText) : HttpClient::Accept(ZenContentType::kJSON),
+ {{"opdetails", m_OpDetails ? "true" : "false"},
+ {"details", m_Details ? "true" : "false"},
+ {"attachmentdetails", m_AttachmentDetails ? "true" : "false"},
+ {"csv", m_CSV ? "true" : "false"}}))
{
- ZEN_ERROR("Info failed: {}: {} ({})", Result.status_code, Result.reason, Result.text);
+ ZEN_CONSOLE("{}", Result.AsText());
+ return 0;
}
else
{
- ZEN_ERROR("Info failed: {}", Result.error.message);
+ Result.ThrowError("failed to get project details"sv);
+ return 1;
}
-
- return 1;
}
////////////////////////////
@@ -1292,29 +1269,29 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_HostName.empty())
{
- throw zen::OptionParseException("unable to resolve server specification");
+ throw OptionParseException("unable to resolve server specification");
}
if (m_ProjectName.empty())
{
- throw zen::OptionParseException("a project must be specified");
+ throw OptionParseException("a project must be specified");
}
if (m_OplogName.empty())
{
- throw zen::OptionParseException("an oplog must be specified");
+ throw OptionParseException("an oplog must be specified");
}
if (m_MirrorRootPath.empty())
{
- throw zen::OptionParseException("a target path must be specified");
+ throw OptionParseException("a target path must be specified");
}
ZEN_CONSOLE("Emitting file data from oplog '{}'/'{}' to '{}'", m_ProjectName, m_OplogName, m_MirrorRootPath);
- zen::HttpClient Http(m_HostName);
+ HttpClient Http(m_HostName);
- if (zen::HttpClient::Response Result = Http.Get(fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName)))
+ if (HttpClient::Response Result = Http.Get(fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName)))
{
// The info requested is not really used at this moment, we just use the probe to be able to provide
// better diagnostics up front
@@ -1329,42 +1306,61 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
// Emit file data to target directory
std::filesystem::path RootPath{m_MirrorRootPath};
- zen::CreateDirectories(RootPath);
+ CreateDirectories(RootPath);
+
+ std::filesystem::path TmpPath = RootPath / ".tmp";
+ CreateDirectories(TmpPath);
+
+ std::atomic_int64_t FileCount = 0;
+ int OplogEntryCount = 0;
- int FileCount = 0;
- int OplogEntryCount = 0;
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+ Latch WorkRemaining(1);
- auto EmitFilesForDataArray = [&](zen::CbArrayView DataArray) {
+ std::unordered_set<std::string> FileNames;
+
+ auto EmitFilesForDataArray = [&](CbArrayView DataArray) {
for (auto DataIter : DataArray)
{
- if (zen::CbObjectView Data = DataIter.AsObjectView())
+ if (CbObjectView Data = DataIter.AsObjectView())
{
- std::string_view FileName = Data["filename"sv].AsString();
- zen::Oid ChunkId = Data["id"sv].AsObjectId();
-
- if (zen::HttpClient::Response ChunkResponse =
- Http.Get(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId)))
- {
- zen::IoBuffer ChunkData = ChunkResponse.ResponsePayload;
- zen::WriteFile(RootPath / FileName, ChunkData);
-
- ++FileCount;
- }
- else
+ std::string FileName = std::string(Data["filename"sv].AsString());
+ Oid ChunkId = Data["id"sv].AsObjectId();
+ if (!FileNames.insert(FileName).second)
{
- ChunkResponse.ThrowError("chunk data fetch failed"sv);
+ continue;
}
+ WorkRemaining.AddCount(1);
+ WorkerPool.ScheduleWork([this, &RootPath, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining]() {
+ auto _ = MakeGuard([&WorkRemaining]() { WorkRemaining.CountDown(); });
+ if (HttpClient::Response ChunkResponse =
+ Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath))
+ {
+ IoBuffer ChunkData = ChunkResponse.ResponsePayload;
+ std::filesystem::path TargetPath = RootPath / FileName;
+ if (!MoveToFile(TargetPath, ChunkData))
+ {
+ WriteFile(TargetPath, ChunkData);
+ }
+ ++FileCount;
+ }
+ else
+ {
+ ZEN_CONSOLE("Unable to fetch '{}' (chunk {}). Reason: '{}'", FileName, ChunkId, ChunkResponse.ErrorMessage(""sv));
+ }
+ });
}
}
};
- if (zen::HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries"sv, m_ProjectName, m_OplogName)))
+ if (HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries"sv, m_ProjectName, m_OplogName)))
{
- if (zen::CbObject ResponseObject = Response.AsObject())
+ if (CbObject ResponseObject = Response.AsObject())
{
for (auto EntryIter : ResponseObject["entries"sv])
{
- zen::CbObjectView Entry = EntryIter.AsObjectView();
+ CbObjectView Entry = EntryIter.AsObjectView();
EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView());
EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView());
@@ -1383,8 +1379,14 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
return 1;
}
+ WorkRemaining.CountDown();
+ WorkRemaining.Wait();
- ZEN_CONSOLE("mirrored {} files from {} oplog entries successfully", FileCount, OplogEntryCount);
+ std::filesystem::remove_all(TmpPath);
+
+ ZEN_CONSOLE("mirrored {} files from {} oplog entries successfully", FileCount.load(), OplogEntryCount);
return 0;
}
+
+} // namespace zen
diff --git a/src/zen/cmds/projectstore.h b/src/zen/cmds/projectstore.h
index 6ab49becf..64779f76c 100644
--- a/src/zen/cmds/projectstore.h
+++ b/src/zen/cmds/projectstore.h
@@ -4,6 +4,8 @@
#include "../zen.h"
+namespace zen {
+
class DropProjectCommand : public ZenCmdBase
{
public:
@@ -246,3 +248,5 @@ private:
std::string m_OplogName;
std::string m_MirrorRootPath;
};
+
+} // namespace zen
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index f1cfe9796..10478cf44 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -439,7 +439,7 @@ main(int argc, char** argv)
}
catch (std::exception& Ex)
{
- printf("Exception caught from 'main': %s\n", Ex.what());
+ printf("Error: %s\n", Ex.what());
return 10;
}