diff options
| author | Dan Engelbrecht <[email protected]> | 2025-11-07 12:27:44 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-11-07 12:27:44 +0100 |
| commit | 72b1797e2b65ad47f4dc8e9fab73b9aa170889b4 (patch) | |
| tree | 93c9f7c99965393ba9c6ec86c01b63899bfba684 /src/zen/cmds/projectstore_cmd.cpp | |
| parent | move progress bar to separate file (#638) (diff) | |
| download | archived-zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.tar.xz archived-zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.zip | |
get oplog attachments (#622)
* add support for downloading individual attachments from an oplog
Diffstat (limited to 'src/zen/cmds/projectstore_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 319 |
1 files changed, 209 insertions, 110 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 6df317823..fe8c0d675 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -9,6 +9,7 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/parallelwork.h> #include <zencore/process.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> @@ -19,11 +20,17 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpclientauth.h> #include <zenhttp/httpcommon.h> +#include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/builds/buildstorageutil.h> #include <zenremotestore/builds/jupiterbuildstorage.h> #include <zenremotestore/jupiter/jupiterhost.h> +#include <zenremotestore/operationlogoutput.h> +#include <zenremotestore/projectstore/projectstoreoperations.h> +#include <zenremotestore/projectstore/remoteprojectstore.h> +#include <zenutil/workerpools.h> #include "../progressbar.h" +#include "../threadworkers.h" ZEN_THIRD_PARTY_INCLUDES_START #include <json11.hpp> @@ -2212,9 +2219,6 @@ OplogDownloadCommand::OplogDownloadCommand() m_Options.add_option("", "", "system-dir", "Specify system root", cxxopts::value(m_SystemRootDir), "<systemdir>"); - m_Options.add_option("output", "", "quiet", "Suppress non-essential output", cxxopts::value(m_Quiet), "<quiet>"); - m_Options.add_option("", "y", "yes", "Don't query for confirmation", cxxopts::value(m_Yes), "<yes>"); - auto AddCloudOptions = [this](cxxopts::Options& Ops) { m_AuthOptions.AddOptions(Ops); @@ -2232,16 +2236,58 @@ OplogDownloadCommand::OplogDownloadCommand() Ops.add_option("cloud build", "", "bucket", "Builds Storage bucket", cxxopts::value(m_Bucket), "<bucket>"); }; + auto AddCacheOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>"); + }; + AddCloudOptions(m_Options); + AddCacheOptions(m_Options); + + auto AddOutputOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("", "y", "yes", "Don't query for confirmation", cxxopts::value(m_Yes), "<yes>"); + Ops.add_option("output", + "", + "plain-progress", + "Show progress using plain output", + cxxopts::value(m_PlainProgress), + "<plainprogress>"); + Ops.add_option("output", + "", + "log-progress", + "Write @progress style progress to output", + cxxopts::value(m_LogProgress), + "<logprogress>"); + Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>"); + Ops.add_option("output", "", "quiet", "Suppress non-essential output", cxxopts::value(m_Quiet), "<quiet>"); + }; + AddOutputOptions(m_Options); m_Options.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); + + m_Options.add_option("", "", "force", "Force download and disregard local cache", cxxopts::value(m_ForceDownload), "<force>"); + m_Options.add_option("", + "", + "boost-workers", + "Increase the number of worker threads - may cause computer to be less responsive", + cxxopts::value(m_BoostWorkerThreads), + "<boostworkers>"); + m_Options.add_option("", "", "decompress", "Decompress downloaded attachment", cxxopts::value(m_DecompressAttachments), "<decompress>"); + m_Options.add_option("", "", "output-path", "Path to oplog output, extension .json or .cb (compact binary). Default is output to console", - cxxopts::value(m_OutputPath), + cxxopts::value(m_OplogOutputPath), "<path>"); + m_Options.add_option("", + "", + "attachments", + "Comma separated list of attachments in RawHash for to download", + cxxopts::value(m_Attachments), + "<attachments>"); + m_Options.add_option("", "", "attachments-path", "Path to folder to write attachments to", cxxopts::value(m_AttachmentsPath), "<path>"); + m_Options.parse_positional({"cloud-url", "output-path"}); m_Options.positional_help("[<cloud-url> <output-path>]"); } @@ -2274,6 +2320,49 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a }; ParseSystemOptions(); + ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty; + + auto ParseOutputOptions = [&]() { + if (m_Verbose && m_Quiet) + { + throw OptionParseException("'--verbose' conflicts with '--quiet'", m_Options.help()); + } + if (m_LogProgress && m_PlainProgress) + { + throw OptionParseException("'--plain-progress' conflicts with '--log-progress'", m_Options.help()); + } + if (m_LogProgress && m_Quiet) + { + throw OptionParseException("'--quiet' conflicts with '--log-progress'", m_Options.help()); + } + if (m_PlainProgress && m_Quiet) + { + throw OptionParseException("'--quiet' conflicts with '--plain-progress'", m_Options.help()); + } + + if (m_LogProgress) + { + ProgressMode = ProgressBar::Mode::Log; + } + else if (m_PlainProgress) + { + ProgressMode = ProgressBar::Mode::Plain; + } + else if (m_Verbose) + { + ProgressMode = ProgressBar::Mode::Plain; + } + else if (m_Quiet) + { + ProgressMode = ProgressBar::Mode::Quiet; + } + else + { + ProgressMode = ProgressBar::Mode::Pretty; + } + }; + ParseOutputOptions(); + auto ParseStorageOptions = [&](bool RequireNamespace, bool RequireBucket) { if (!m_Url.empty()) { @@ -2316,13 +2405,25 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } }; + ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true); + + ThreadWorkers Workers(m_BoostWorkerThreads, /*SingleThreaded*/ false); + if (!m_Quiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } + std::unique_ptr<AuthMgr> Auth; HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", .AssumeHttp2 = m_AssumeHttp2, .AllowResume = true, .RetryCount = 2}; - ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true); + Oid BuildId = Oid::TryFromHexString(m_BuildId); + if (BuildId == Oid::Zero) + { + throw OptionParseException(fmt::format("'--build-id' ('{}') is malformed", m_BuildId), m_Options.help()); + } m_AuthOptions.ParseOptions(m_Options, m_SystemRootDir, @@ -2332,138 +2433,141 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a m_Quiet, /*Hidden*/ false); - BuildStorageResolveResult ResolveRes = ResolveBuildStorage(ClientSettings, m_Host, m_OverrideHost, ""sv, ZenCacheResolveMode::Off); + BuildStorageResolveResult ResolveRes = + ResolveBuildStorage(ClientSettings, m_Host, m_OverrideHost, m_ZenCacheHost, ZenCacheResolveMode::Discovery); -#if 0 - std::string BuildStorageName = ZEN_CLOUD_STORAGE; + BuildStorageBase::Statistics StorageStats; - std::string CloudHost; + StorageInstance Storage; - if (m_OverrideHost.empty()) - { - JupiterServerDiscovery Response = DiscoverJupiterEndpoints(m_Host, ClientSettings); + Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings); - if (Response.ServerEndPoints.empty()) - { - throw std::runtime_error(fmt::format("Failed to find any builds hosts at {}", m_Host)); - } - for (const JupiterServerDiscovery::EndPoint& ServerEndpoint : Response.ServerEndPoints) - { - if (!ServerEndpoint.BaseUrl.empty()) - { - if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(ServerEndpoint.BaseUrl, ServerEndpoint.AssumeHttp2); - TestResult.Success) - { - CloudHost = ServerEndpoint.BaseUrl; - m_AssumeHttp2 = ServerEndpoint.AssumeHttp2; - BuildStorageName = ServerEndpoint.Name; - break; - } - else - { - ZEN_DEBUG("Unable to reach host {}. Reason: {}", ServerEndpoint.BaseUrl, TestResult.FailureReason); - } - } - } - if (CloudHost.empty()) - { - throw std::runtime_error( - fmt::format("Failed to find any usable builds hosts out of {} using {}", Response.ServerEndPoints.size(), m_Host)); - } - } - else if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(m_OverrideHost, m_AssumeHttp2); TestResult.Success) - { - CloudHost = m_OverrideHost; - } - else - { - throw std::runtime_error(fmt::format("Host {} could not be reached. Reason: {}", m_OverrideHost, TestResult.FailureReason)); - } -#endif // 0 + BuildStorageCache::Statistics StorageCacheStats; - Oid BuildId = Oid::TryFromHexString(m_BuildId); - if (BuildId == Oid::Zero) + std::atomic<bool> AbortFlag(false); + + if (!ResolveRes.CacheUrl.empty()) { - throw OptionParseException("'--build-id' is malformed", m_Options.help()); + Storage.CacheHttp = std::make_unique<HttpClient>(ResolveRes.CacheUrl, + HttpClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, + .AllowResume = true, + .RetryCount = 0}, + [&AbortFlag]() { return AbortFlag.load(); }); + Storage.CacheName = ResolveRes.CacheName; } - BuildStorageBase::Statistics StorageStats; - HttpClient BuildStorageHttp(ResolveRes.HostUrl, ClientSettings); - if (!m_Quiet) { std::string StorageDescription = - fmt::format("Cloud {}{}. Namespace '{}', Bucket '{}'", + fmt::format("Cloud {}{}. SessionId {}. Namespace '{}', Bucket '{}'", ResolveRes.HostName, (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl), + Storage.BuildStorageHttp->GetSessionId(), m_Namespace, m_Bucket); ZEN_CONSOLE("Remote: {}", StorageDescription); - } - - std::filesystem::path StorageTempPath = std::filesystem::temp_directory_path() / ("zen_" + Oid::NewOid().ToString()); - std::unique_ptr<BuildStorageBase> BuildStorage = - CreateJupiterBuildStorage(Log(), BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath); + if (Storage.CacheHttp) + { + std::string CacheDescription = + fmt::format("Zen {}{}. SessionId {}. Namespace '{}', Bucket '{}'", + ResolveRes.CacheName, + (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl), + Storage.CacheHttp->GetSessionId(), + m_Namespace, + m_Bucket); - Stopwatch Timer; - CbObject BuildObject = BuildStorage->GetBuild(BuildId); - if (!m_Quiet) - { - ZEN_CONSOLE("Fetched {}/{}/{}/{} in {}", m_Url, m_Namespace, m_Bucket, BuildId, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + ZEN_CONSOLE("Cache : {}", CacheDescription); + } } - Timer.Reset(); + std::string FullBuildKey = fmt::format("{}_{}_{}", m_Namespace, m_Bucket, m_BuildId); + IoHash FullBuildKeyHash = IoHash::HashBuffer(FullBuildKey.data(), FullBuildKey.length()); - CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); - if (!PartsObject) - { - throw std::runtime_error( - fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, m_Url, m_Namespace, m_Bucket, m_BuildId)); - } + std::filesystem::path StorageTempPath = std::filesystem::temp_directory_path() / ("zen_" + FullBuildKeyHash.ToHexString()); - static const std::string_view OplogContainerPartName = "oplogcontainer"sv; + Storage.BuildStorage = + CreateJupiterBuildStorage(Log(), *Storage.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath); + Storage.StorageName = ResolveRes.HostName; - const Oid OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); - if (OplogBuildPartId == Oid::Zero) + if (Storage.CacheHttp) { - throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - OplogContainerPartName)); + Storage.BuildCacheStorage = CreateZenBuildStorageCache( + *Storage.CacheHttp, + StorageCacheStats, + m_Namespace, + m_Bucket, + StorageTempPath / "zencache", + false ? GetSmallWorkerPool(EWorkloadType::Background) : GetTinyWorkerPool(EWorkloadType::Background)); } - CbObject ContainerObject = BuildStorage->GetBuildPart(BuildId, OplogBuildPartId); + std::unique_ptr<OperationLogOutput> OperationLogOutput(CreateConsoleLogOutput(ProgressMode)); - MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); - IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); - IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + ProjectStoreOperationOplogState State( + *OperationLogOutput, + Storage, + BuildId, + {.IsQuiet = m_Quiet, .IsVerbose = m_Verbose, .ForceDownload = m_ForceDownload, .TempFolderPath = StorageTempPath}); - CbValidateError ValidateResult = CbValidateError::None; - if (CbObject SectionObject = ValidateAndReadCompactBinaryObject(std::move(SectionPayload), ValidateResult); - ValidateResult == CbValidateError::None && ContainerObject) + const Oid OplogBuildPartId = State.GetBuildPartId(); + + if (!m_Attachments.empty()) { - if (!m_Quiet) + if (m_AttachmentsPath.empty()) { - ZEN_CONSOLE("Decompressed and validated oplog payload {} -> {} in {}", - NiceBytes(OpsSection.GetSize()), - NiceBytes(SectionObject.GetSize()), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + throw OptionParseException("'--attachments-path' is required when '--attachments' is given", m_Options.help()); } - if (m_OutputPath.empty()) + std::filesystem::path AttachmentsPath = MakeSafeAbsolutePath(m_AttachmentsPath); + CreateDirectories(AttachmentsPath); + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(m_Attachments.size()); + for (const std::string& Attachment : m_Attachments) + { + IoHash RawHash; + if (!IoHash::TryParse(Attachment, RawHash)) + { + throw OptionParseException(fmt::format("'--attachments' ('{}') is malformed", Attachment), m_Options.help()); + } + AttachmentHashes.push_back(RawHash); + } + + std::atomic<bool> PauseFlag; + ProjectStoreOperationDownloadAttachments Op(*OperationLogOutput, + Storage, + AbortFlag, + PauseFlag, + Workers.GetIOWorkerPool(), + Workers.GetNetworkPool(), + State, + AttachmentHashes, + {.IsQuiet = m_Quiet, + .IsVerbose = m_Verbose, + .ForceDownload = m_ForceDownload, + .DecompressAttachments = m_DecompressAttachments, + .TempFolderPath = StorageTempPath, + .AttachmentOutputPath = m_AttachmentsPath}); + + Op.Execute(); + } + else + { + CbObjectView OpsSectionObject = State.LoadOpsSectionObject(); + + if (m_OplogOutputPath.empty()) { if (!m_Yes) { - if (OpsSection.GetSize() > 8u * 1024u * 1024u) + if (OpsSectionObject.GetSize() > 8u * 1024u * 1024u) { while (!m_Yes) { const std::string Prompt = fmt::format("Do you want to output an oplog of size {} to console? (yes/no) ", - NiceBytes(SectionObject.GetSize())); + NiceBytes(OpsSectionObject.GetSize())); printf("%s", Prompt.c_str()); std::string Reponse; std::getline(std::cin, Reponse); @@ -2480,7 +2584,7 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } } ExtendableStringBuilder<1024> SB; - SectionObject.ToJson(SB); + OpsSectionObject.ToJson(SB); ForEachStrTok(SB.ToView(), '\n', [](std::string_view Row) { ZEN_CONSOLE("{}", Row); return true; @@ -2488,17 +2592,17 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } else { - Timer.Reset(); - const std::string Extension = ToLower(m_OutputPath.extension().string()); + Stopwatch Timer; + const std::string Extension = ToLower(m_OplogOutputPath.extension().string()); if (Extension == ".cb" || Extension == ".cbo") { - WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, SectionObject.GetView().GetData(), SectionObject.GetSize())); + WriteFile(m_OplogOutputPath, IoBuffer(IoBuffer::Wrap, OpsSectionObject.GetView().GetData(), OpsSectionObject.GetSize())); } else if (Extension == ".json") { ExtendableStringBuilder<1024> SB; - SectionObject.ToJson(SB); - WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); + OpsSectionObject.ToJson(SB); + WriteFile(m_OplogOutputPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); } else { @@ -2507,17 +2611,12 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a if (!m_Quiet) { ZEN_CONSOLE("Wrote {} to '{}' in {}", - NiceBytes(FileSizeFromPath(m_OutputPath)), - m_OutputPath, + NiceBytes(FileSizeFromPath(m_OplogOutputPath)), + m_OplogOutputPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } } } - else - { - throw std::runtime_error( - fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult))); - } } } // namespace zen |