aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/projectstore_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-07 12:27:44 +0100
committerGitHub Enterprise <[email protected]>2025-11-07 12:27:44 +0100
commit72b1797e2b65ad47f4dc8e9fab73b9aa170889b4 (patch)
tree93c9f7c99965393ba9c6ec86c01b63899bfba684 /src/zen/cmds/projectstore_cmd.cpp
parentmove progress bar to separate file (#638) (diff)
downloadarchived-zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.tar.xz
archived-zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.zip
get oplog attachments (#622)
* add support for downloading individual attachments from an oplog
Diffstat (limited to 'src/zen/cmds/projectstore_cmd.cpp')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp319
1 files changed, 209 insertions, 110 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 6df317823..fe8c0d675 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -9,6 +9,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/parallelwork.h>
#include <zencore/process.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
@@ -19,11 +20,17 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/httpclientauth.h>
#include <zenhttp/httpcommon.h>
+#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/builds/buildstorageutil.h>
#include <zenremotestore/builds/jupiterbuildstorage.h>
#include <zenremotestore/jupiter/jupiterhost.h>
+#include <zenremotestore/operationlogoutput.h>
+#include <zenremotestore/projectstore/projectstoreoperations.h>
+#include <zenremotestore/projectstore/remoteprojectstore.h>
+#include <zenutil/workerpools.h>
#include "../progressbar.h"
+#include "../threadworkers.h"
ZEN_THIRD_PARTY_INCLUDES_START
#include <json11.hpp>
@@ -2212,9 +2219,6 @@ OplogDownloadCommand::OplogDownloadCommand()
m_Options.add_option("", "", "system-dir", "Specify system root", cxxopts::value(m_SystemRootDir), "<systemdir>");
- m_Options.add_option("output", "", "quiet", "Suppress non-essential output", cxxopts::value(m_Quiet), "<quiet>");
- m_Options.add_option("", "y", "yes", "Don't query for confirmation", cxxopts::value(m_Yes), "<yes>");
-
auto AddCloudOptions = [this](cxxopts::Options& Ops) {
m_AuthOptions.AddOptions(Ops);
@@ -2232,16 +2236,58 @@ OplogDownloadCommand::OplogDownloadCommand()
Ops.add_option("cloud build", "", "bucket", "Builds Storage bucket", cxxopts::value(m_Bucket), "<bucket>");
};
+ auto AddCacheOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>");
+ };
+
AddCloudOptions(m_Options);
+ AddCacheOptions(m_Options);
+
+ auto AddOutputOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("", "y", "yes", "Don't query for confirmation", cxxopts::value(m_Yes), "<yes>");
+ Ops.add_option("output",
+ "",
+ "plain-progress",
+ "Show progress using plain output",
+ cxxopts::value(m_PlainProgress),
+ "<plainprogress>");
+ Ops.add_option("output",
+ "",
+ "log-progress",
+ "Write @progress style progress to output",
+ cxxopts::value(m_LogProgress),
+ "<logprogress>");
+ Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>");
+ Ops.add_option("output", "", "quiet", "Suppress non-essential output", cxxopts::value(m_Quiet), "<quiet>");
+ };
+ AddOutputOptions(m_Options);
m_Options.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
+
+ m_Options.add_option("", "", "force", "Force download and disregard local cache", cxxopts::value(m_ForceDownload), "<force>");
+ m_Options.add_option("",
+ "",
+ "boost-workers",
+ "Increase the number of worker threads - may cause computer to be less responsive",
+ cxxopts::value(m_BoostWorkerThreads),
+ "<boostworkers>");
+ m_Options.add_option("", "", "decompress", "Decompress downloaded attachment", cxxopts::value(m_DecompressAttachments), "<decompress>");
+
m_Options.add_option("",
"",
"output-path",
"Path to oplog output, extension .json or .cb (compact binary). Default is output to console",
- cxxopts::value(m_OutputPath),
+ cxxopts::value(m_OplogOutputPath),
"<path>");
+ m_Options.add_option("",
+ "",
+ "attachments",
+ "Comma separated list of attachments in RawHash for to download",
+ cxxopts::value(m_Attachments),
+ "<attachments>");
+ m_Options.add_option("", "", "attachments-path", "Path to folder to write attachments to", cxxopts::value(m_AttachmentsPath), "<path>");
+
m_Options.parse_positional({"cloud-url", "output-path"});
m_Options.positional_help("[<cloud-url> <output-path>]");
}
@@ -2274,6 +2320,49 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
};
ParseSystemOptions();
+ ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty;
+
+ auto ParseOutputOptions = [&]() {
+ if (m_Verbose && m_Quiet)
+ {
+ throw OptionParseException("'--verbose' conflicts with '--quiet'", m_Options.help());
+ }
+ if (m_LogProgress && m_PlainProgress)
+ {
+ throw OptionParseException("'--plain-progress' conflicts with '--log-progress'", m_Options.help());
+ }
+ if (m_LogProgress && m_Quiet)
+ {
+ throw OptionParseException("'--quiet' conflicts with '--log-progress'", m_Options.help());
+ }
+ if (m_PlainProgress && m_Quiet)
+ {
+ throw OptionParseException("'--quiet' conflicts with '--plain-progress'", m_Options.help());
+ }
+
+ if (m_LogProgress)
+ {
+ ProgressMode = ProgressBar::Mode::Log;
+ }
+ else if (m_PlainProgress)
+ {
+ ProgressMode = ProgressBar::Mode::Plain;
+ }
+ else if (m_Verbose)
+ {
+ ProgressMode = ProgressBar::Mode::Plain;
+ }
+ else if (m_Quiet)
+ {
+ ProgressMode = ProgressBar::Mode::Quiet;
+ }
+ else
+ {
+ ProgressMode = ProgressBar::Mode::Pretty;
+ }
+ };
+ ParseOutputOptions();
+
auto ParseStorageOptions = [&](bool RequireNamespace, bool RequireBucket) {
if (!m_Url.empty())
{
@@ -2316,13 +2405,25 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
}
};
+ ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true);
+
+ ThreadWorkers Workers(m_BoostWorkerThreads, /*SingleThreaded*/ false);
+ if (!m_Quiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
+
std::unique_ptr<AuthMgr> Auth;
HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
.AssumeHttp2 = m_AssumeHttp2,
.AllowResume = true,
.RetryCount = 2};
- ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true);
+ Oid BuildId = Oid::TryFromHexString(m_BuildId);
+ if (BuildId == Oid::Zero)
+ {
+ throw OptionParseException(fmt::format("'--build-id' ('{}') is malformed", m_BuildId), m_Options.help());
+ }
m_AuthOptions.ParseOptions(m_Options,
m_SystemRootDir,
@@ -2332,138 +2433,141 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
m_Quiet,
/*Hidden*/ false);
- BuildStorageResolveResult ResolveRes = ResolveBuildStorage(ClientSettings, m_Host, m_OverrideHost, ""sv, ZenCacheResolveMode::Off);
+ BuildStorageResolveResult ResolveRes =
+ ResolveBuildStorage(ClientSettings, m_Host, m_OverrideHost, m_ZenCacheHost, ZenCacheResolveMode::Discovery);
-#if 0
- std::string BuildStorageName = ZEN_CLOUD_STORAGE;
+ BuildStorageBase::Statistics StorageStats;
- std::string CloudHost;
+ StorageInstance Storage;
- if (m_OverrideHost.empty())
- {
- JupiterServerDiscovery Response = DiscoverJupiterEndpoints(m_Host, ClientSettings);
+ Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings);
- if (Response.ServerEndPoints.empty())
- {
- throw std::runtime_error(fmt::format("Failed to find any builds hosts at {}", m_Host));
- }
- for (const JupiterServerDiscovery::EndPoint& ServerEndpoint : Response.ServerEndPoints)
- {
- if (!ServerEndpoint.BaseUrl.empty())
- {
- if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(ServerEndpoint.BaseUrl, ServerEndpoint.AssumeHttp2);
- TestResult.Success)
- {
- CloudHost = ServerEndpoint.BaseUrl;
- m_AssumeHttp2 = ServerEndpoint.AssumeHttp2;
- BuildStorageName = ServerEndpoint.Name;
- break;
- }
- else
- {
- ZEN_DEBUG("Unable to reach host {}. Reason: {}", ServerEndpoint.BaseUrl, TestResult.FailureReason);
- }
- }
- }
- if (CloudHost.empty())
- {
- throw std::runtime_error(
- fmt::format("Failed to find any usable builds hosts out of {} using {}", Response.ServerEndPoints.size(), m_Host));
- }
- }
- else if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(m_OverrideHost, m_AssumeHttp2); TestResult.Success)
- {
- CloudHost = m_OverrideHost;
- }
- else
- {
- throw std::runtime_error(fmt::format("Host {} could not be reached. Reason: {}", m_OverrideHost, TestResult.FailureReason));
- }
-#endif // 0
+ BuildStorageCache::Statistics StorageCacheStats;
- Oid BuildId = Oid::TryFromHexString(m_BuildId);
- if (BuildId == Oid::Zero)
+ std::atomic<bool> AbortFlag(false);
+
+ if (!ResolveRes.CacheUrl.empty())
{
- throw OptionParseException("'--build-id' is malformed", m_Options.help());
+ Storage.CacheHttp = std::make_unique<HttpClient>(ResolveRes.CacheUrl,
+ HttpClientSettings{.LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{3000},
+ .Timeout = std::chrono::milliseconds{30000},
+ .AssumeHttp2 = ResolveRes.CacheAssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0},
+ [&AbortFlag]() { return AbortFlag.load(); });
+ Storage.CacheName = ResolveRes.CacheName;
}
- BuildStorageBase::Statistics StorageStats;
- HttpClient BuildStorageHttp(ResolveRes.HostUrl, ClientSettings);
-
if (!m_Quiet)
{
std::string StorageDescription =
- fmt::format("Cloud {}{}. Namespace '{}', Bucket '{}'",
+ fmt::format("Cloud {}{}. SessionId {}. Namespace '{}', Bucket '{}'",
ResolveRes.HostName,
(ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl),
+ Storage.BuildStorageHttp->GetSessionId(),
m_Namespace,
m_Bucket);
ZEN_CONSOLE("Remote: {}", StorageDescription);
- }
-
- std::filesystem::path StorageTempPath = std::filesystem::temp_directory_path() / ("zen_" + Oid::NewOid().ToString());
- std::unique_ptr<BuildStorageBase> BuildStorage =
- CreateJupiterBuildStorage(Log(), BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath);
+ if (Storage.CacheHttp)
+ {
+ std::string CacheDescription =
+ fmt::format("Zen {}{}. SessionId {}. Namespace '{}', Bucket '{}'",
+ ResolveRes.CacheName,
+ (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl),
+ Storage.CacheHttp->GetSessionId(),
+ m_Namespace,
+ m_Bucket);
- Stopwatch Timer;
- CbObject BuildObject = BuildStorage->GetBuild(BuildId);
- if (!m_Quiet)
- {
- ZEN_CONSOLE("Fetched {}/{}/{}/{} in {}", m_Url, m_Namespace, m_Bucket, BuildId, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ ZEN_CONSOLE("Cache : {}", CacheDescription);
+ }
}
- Timer.Reset();
+ std::string FullBuildKey = fmt::format("{}_{}_{}", m_Namespace, m_Bucket, m_BuildId);
+ IoHash FullBuildKeyHash = IoHash::HashBuffer(FullBuildKey.data(), FullBuildKey.length());
- CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
- if (!PartsObject)
- {
- throw std::runtime_error(
- fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, m_Url, m_Namespace, m_Bucket, m_BuildId));
- }
+ std::filesystem::path StorageTempPath = std::filesystem::temp_directory_path() / ("zen_" + FullBuildKeyHash.ToHexString());
- static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
+ Storage.BuildStorage =
+ CreateJupiterBuildStorage(Log(), *Storage.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath);
+ Storage.StorageName = ResolveRes.HostName;
- const Oid OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
- if (OplogBuildPartId == Oid::Zero)
+ if (Storage.CacheHttp)
{
- throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- OplogContainerPartName));
+ Storage.BuildCacheStorage = CreateZenBuildStorageCache(
+ *Storage.CacheHttp,
+ StorageCacheStats,
+ m_Namespace,
+ m_Bucket,
+ StorageTempPath / "zencache",
+ false ? GetSmallWorkerPool(EWorkloadType::Background) : GetTinyWorkerPool(EWorkloadType::Background));
}
- CbObject ContainerObject = BuildStorage->GetBuildPart(BuildId, OplogBuildPartId);
+ std::unique_ptr<OperationLogOutput> OperationLogOutput(CreateConsoleLogOutput(ProgressMode));
- MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
- IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
- IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
+ ProjectStoreOperationOplogState State(
+ *OperationLogOutput,
+ Storage,
+ BuildId,
+ {.IsQuiet = m_Quiet, .IsVerbose = m_Verbose, .ForceDownload = m_ForceDownload, .TempFolderPath = StorageTempPath});
- CbValidateError ValidateResult = CbValidateError::None;
- if (CbObject SectionObject = ValidateAndReadCompactBinaryObject(std::move(SectionPayload), ValidateResult);
- ValidateResult == CbValidateError::None && ContainerObject)
+ const Oid OplogBuildPartId = State.GetBuildPartId();
+
+ if (!m_Attachments.empty())
{
- if (!m_Quiet)
+ if (m_AttachmentsPath.empty())
{
- ZEN_CONSOLE("Decompressed and validated oplog payload {} -> {} in {}",
- NiceBytes(OpsSection.GetSize()),
- NiceBytes(SectionObject.GetSize()),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ throw OptionParseException("'--attachments-path' is required when '--attachments' is given", m_Options.help());
}
- if (m_OutputPath.empty())
+ std::filesystem::path AttachmentsPath = MakeSafeAbsolutePath(m_AttachmentsPath);
+ CreateDirectories(AttachmentsPath);
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(m_Attachments.size());
+ for (const std::string& Attachment : m_Attachments)
+ {
+ IoHash RawHash;
+ if (!IoHash::TryParse(Attachment, RawHash))
+ {
+ throw OptionParseException(fmt::format("'--attachments' ('{}') is malformed", Attachment), m_Options.help());
+ }
+ AttachmentHashes.push_back(RawHash);
+ }
+
+ std::atomic<bool> PauseFlag;
+ ProjectStoreOperationDownloadAttachments Op(*OperationLogOutput,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ Workers.GetIOWorkerPool(),
+ Workers.GetNetworkPool(),
+ State,
+ AttachmentHashes,
+ {.IsQuiet = m_Quiet,
+ .IsVerbose = m_Verbose,
+ .ForceDownload = m_ForceDownload,
+ .DecompressAttachments = m_DecompressAttachments,
+ .TempFolderPath = StorageTempPath,
+ .AttachmentOutputPath = m_AttachmentsPath});
+
+ Op.Execute();
+ }
+ else
+ {
+ CbObjectView OpsSectionObject = State.LoadOpsSectionObject();
+
+ if (m_OplogOutputPath.empty())
{
if (!m_Yes)
{
- if (OpsSection.GetSize() > 8u * 1024u * 1024u)
+ if (OpsSectionObject.GetSize() > 8u * 1024u * 1024u)
{
while (!m_Yes)
{
const std::string Prompt = fmt::format("Do you want to output an oplog of size {} to console? (yes/no) ",
- NiceBytes(SectionObject.GetSize()));
+ NiceBytes(OpsSectionObject.GetSize()));
printf("%s", Prompt.c_str());
std::string Reponse;
std::getline(std::cin, Reponse);
@@ -2480,7 +2584,7 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
}
}
ExtendableStringBuilder<1024> SB;
- SectionObject.ToJson(SB);
+ OpsSectionObject.ToJson(SB);
ForEachStrTok(SB.ToView(), '\n', [](std::string_view Row) {
ZEN_CONSOLE("{}", Row);
return true;
@@ -2488,17 +2592,17 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
}
else
{
- Timer.Reset();
- const std::string Extension = ToLower(m_OutputPath.extension().string());
+ Stopwatch Timer;
+ const std::string Extension = ToLower(m_OplogOutputPath.extension().string());
if (Extension == ".cb" || Extension == ".cbo")
{
- WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, SectionObject.GetView().GetData(), SectionObject.GetSize()));
+ WriteFile(m_OplogOutputPath, IoBuffer(IoBuffer::Wrap, OpsSectionObject.GetView().GetData(), OpsSectionObject.GetSize()));
}
else if (Extension == ".json")
{
ExtendableStringBuilder<1024> SB;
- SectionObject.ToJson(SB);
- WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
+ OpsSectionObject.ToJson(SB);
+ WriteFile(m_OplogOutputPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
}
else
{
@@ -2507,17 +2611,12 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
if (!m_Quiet)
{
ZEN_CONSOLE("Wrote {} to '{}' in {}",
- NiceBytes(FileSizeFromPath(m_OutputPath)),
- m_OutputPath,
+ NiceBytes(FileSizeFromPath(m_OplogOutputPath)),
+ m_OplogOutputPath,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
}
}
- else
- {
- throw std::runtime_error(
- fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult)));
- }
}
} // namespace zen