diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 113 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.h | 17 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 131 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.h | 15 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 10 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h | 1 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/transferthreadworkers.h (renamed from src/zen/threadworkers.h) | 10 | ||||
| -rw-r--r-- | src/zenremotestore/projectstore/buildsremoteprojectstore.cpp | 29 | ||||
| -rw-r--r-- | src/zenremotestore/transferthreadworkers.cpp (renamed from src/zen/threadworkers.cpp) | 6 | ||||
| -rw-r--r-- | src/zenserver/storage/projectstore/httpprojectstore.cpp | 68 | ||||
| -rw-r--r-- | src/zenserver/storage/projectstore/httpprojectstore.h | 27 |
11 files changed, 311 insertions, 116 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 6a97ab542..a2b2fb0f6 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -37,12 +37,12 @@ #include <zenremotestore/filesystemutils.h> #include <zenremotestore/jupiter/jupiterhost.h> #include <zenremotestore/operationlogoutput.h> +#include <zenremotestore/transferthreadworkers.h> #include <zenutil/wildcard.h> #include <zenutil/workerpools.h> #include <zenutil/zenserverprocess.h> #include "../progressbar.h" -#include "../threadworkers.h" #include <signal.h> #include <memory> @@ -284,9 +284,9 @@ namespace { ); - static uint64_t GetMaxMemoryBufferSize(bool BoostWorkerMemory) + static uint64_t GetMaxMemoryBufferSize(size_t MaxBlockSize, bool BoostWorkerMemory) { - return BoostWorkerMemory ? DefaultSplitAndCompressChunkSize : 1024u * 1024u; + return BoostWorkerMemory ? (MaxBlockSize + 16u * 1024u) : 1024u * 1024u; } bool IncludePath(std::span<const std::string> IncludeWildcards, @@ -394,7 +394,7 @@ namespace { } void ValidateBuildPart(OperationLogOutput& Output, - ThreadWorkers& Workers, + TransferThreadWorkers& Workers, BuildStorageBase& Storage, const Oid& BuildId, Oid BuildPartId, @@ -428,7 +428,7 @@ namespace { } void UploadFolder(OperationLogOutput& Output, - ThreadWorkers& Workers, + TransferThreadWorkers& Workers, StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, @@ -682,7 +682,7 @@ namespace { uint64_t VerifyElapsedWallTimeUs = 0; }; - void VerifyFolder(ThreadWorkers& Workers, + void VerifyFolder(TransferThreadWorkers& Workers, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::filesystem::path& Path, @@ -875,7 +875,7 @@ namespace { } } - FolderContent GetValidFolderContent(ThreadWorkers& Workers, + FolderContent GetValidFolderContent(TransferThreadWorkers& Workers, GetFolderContentStatistics& LocalFolderScanStats, const std::filesystem::path& Path, std::span<const std::filesystem::path> PathsToCheck, @@ -1368,7 +1368,7 @@ namespace { return NewPaths; } - BuildSaveState GetLocalStateFromPaths(ThreadWorkers& Workers, + BuildSaveState GetLocalStateFromPaths(TransferThreadWorkers& Workers, GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, @@ -1444,7 +1444,7 @@ namespace { .LocalPath = Path}; } - BuildSaveState GetLocalContent(ThreadWorkers& Workers, + BuildSaveState GetLocalContent(TransferThreadWorkers& Workers, GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, @@ -1595,7 +1595,7 @@ namespace { } ChunkedFolderContent ScanAndChunkFolder( - ThreadWorkers& Workers, + TransferThreadWorkers& Workers, GetFolderContentStatistics& GetFolderContentStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, @@ -1671,7 +1671,7 @@ namespace { }; void DownloadFolder(OperationLogOutput& Output, - ThreadWorkers& Workers, + TransferThreadWorkers& Workers, StorageInstance& Storage, const BuildStorageCache::Statistics& StorageCacheStats, const Oid& BuildId, @@ -2143,7 +2143,7 @@ namespace { } } - void DiffFolders(ThreadWorkers& Workers, + void DiffFolders(TransferThreadWorkers& Workers, const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked) @@ -2423,10 +2423,11 @@ BuildsCommand::BuildsCommand() auto AddWorkerOptions = [this](cxxopts::Options& Ops) { Ops.add_option("", "", - "boost-workers", + "boost-worker-count", "Increase the number of worker threads - may cause computer to be less responsive", - cxxopts::value(m_BoostWorkerThreads), - "<boostworkers>"); + cxxopts::value(m_BoostWorkerCount), + "<boostworkercount>"); + Ops.add_option("", "", "boost-worker-memory", @@ -2434,6 +2435,13 @@ BuildsCommand::BuildsCommand() "be less responsive due to high memory usage", cxxopts::value(m_BoostWorkerMemory), "<boostworkermemory>"); + + Ops.add_option("", + "", + "boost-workers", + "Enables both 'boost-worker-count' and 'boost-worker-memory' - may cause computer to be less responsive", + cxxopts::value(m_BoostWorkers), + "<boostworkermemory>"); }; auto AddZenFolderOptions = [this](cxxopts::Options& Ops) { @@ -3039,6 +3047,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) }; ParseOutputOptions(); + if (m_BoostWorkers) + { + m_BoostWorkerCount = true; + m_BoostWorkerMemory = true; + } + std::unique_ptr<AuthMgr> Auth; std::unique_ptr<OperationLogOutput> Output(CreateConsoleLogOutput(ProgressMode)); @@ -3051,12 +3065,13 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) bool BoostCacheBackgroundWorkerPool) -> StorageInstance { ParseStorageOptions(RequireNamespace, RequireBucket); - HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", - .AssumeHttp2 = m_AssumeHttp2, - .AllowResume = true, - .RetryCount = 2, - .Verbose = m_VerboseHttp, - .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(m_BoostWorkerMemory)}; + HttpClientSettings ClientSettings{ + .LogCategory = "httpbuildsclient", + .AssumeHttp2 = m_AssumeHttp2, + .AllowResume = true, + .RetryCount = 2, + .Verbose = m_VerboseHttp, + .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)}; std::unique_ptr<AuthMgr> Auth; @@ -3105,14 +3120,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { Result.CacheHttp = std::make_unique<HttpClient>( ResolveRes.CacheUrl, - HttpClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{3000}, - .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, - .AllowResume = true, - .RetryCount = 0, - .Verbose = m_VerboseHttp, - .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(m_BoostWorkerMemory)}, + HttpClientSettings{ + .LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, + .AllowResume = true, + .RetryCount = 0, + .Verbose = m_VerboseHttp, + .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)}, []() { return AbortFlag.load(); }); Result.BuildCacheStorage = CreateZenBuildStorageCache(*Result.CacheHttp, @@ -3151,14 +3167,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { Result.CacheHttp = std::make_unique<HttpClient>( m_ZenCacheHost, - HttpClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{3000}, - .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = m_AssumeHttp2, - .AllowResume = true, - .RetryCount = 0, - .Verbose = m_VerboseHttp, - .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(m_BoostWorkerMemory)}, + HttpClientSettings{ + .LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = m_AssumeHttp2, + .AllowResume = true, + .RetryCount = 0, + .Verbose = m_VerboseHttp, + .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)}, []() { return AbortFlag.load(); }); Result.BuildCacheStorage = CreateZenBuildStorageCache(*Result.CacheHttp, @@ -3600,7 +3617,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + TransferThreadWorkers Workers(m_BoostWorkerCount, SingleThreaded); if (!IsQuiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); @@ -3724,7 +3741,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + TransferThreadWorkers Workers(m_BoostWorkerCount, SingleThreaded); if (!IsQuiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); @@ -3806,9 +3823,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) .AllowFileClone = m_AllowFileClone, .IncludeWildcards = IncludeWildcards, .ExcludeWildcards = ExcludeWildcards, - .MaximumInMemoryPayloadSize = GetMaxMemoryBufferSize(m_BoostWorkerMemory), - .PopulateCache = m_UploadToZenCache, - .AppendNewContent = m_AppendNewContent}); + .MaximumInMemoryPayloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory), + .PopulateCache = m_UploadToZenCache, + .AppendNewContent = m_AppendNewContent}); if (AbortFlag) { @@ -3865,7 +3882,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + TransferThreadWorkers Workers(m_BoostWorkerCount, SingleThreaded); if (!IsQuiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); @@ -3888,7 +3905,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + TransferThreadWorkers Workers(m_BoostWorkerCount, SingleThreaded); if (!IsQuiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); @@ -3971,7 +3988,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + TransferThreadWorkers Workers(m_BoostWorkerCount, SingleThreaded); if (!IsQuiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); @@ -4024,7 +4041,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + TransferThreadWorkers Workers(m_BoostWorkerCount, SingleThreaded); if (!IsQuiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); @@ -4072,7 +4089,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_MultiTestDownloadOptions) { - ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + TransferThreadWorkers Workers(m_BoostWorkerCount, SingleThreaded); if (!IsQuiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); @@ -4187,7 +4204,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_TestOptions) { - ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + TransferThreadWorkers Workers(m_BoostWorkerCount, SingleThreaded); if (!IsQuiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index 24e2e30e4..081a3a460 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -29,14 +29,15 @@ private: std::filesystem::path m_SystemRootDir; - bool m_PlainProgress = false; - bool m_LogProgress = false; - bool m_Verbose = false; - bool m_BoostWorkerThreads = false; - bool m_BoostWorkerMemory = false; - bool m_UseSparseFiles = true; - bool m_Quiet = false; - bool m_AllowFileClone = true; + bool m_PlainProgress = false; + bool m_LogProgress = false; + bool m_Verbose = false; + bool m_BoostWorkerCount = false; + bool m_BoostWorkerMemory = false; + bool m_BoostWorkers = false; + bool m_UseSparseFiles = true; + bool m_Quiet = false; + bool m_AllowFileClone = true; std::filesystem::path m_ZenFolderPath; diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 02436f4da..15fb1f16c 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -27,10 +27,10 @@ #include <zenremotestore/operationlogoutput.h> #include <zenremotestore/projectstore/projectstoreoperations.h> #include <zenremotestore/projectstore/remoteprojectstore.h> +#include <zenremotestore/transferthreadworkers.h> #include <zenutil/workerpools.h> #include "../progressbar.h" -#include "../threadworkers.h" ZEN_THIRD_PARTY_INCLUDES_START #include <json11.hpp> @@ -950,7 +950,30 @@ ExportOplogCommand::ExportOplogCommand() m_Options.add_option("", "", "plainprogress", "Use (legacy) plain progress update", cxxopts::value(m_PlainProgress), "<plainprogress>"); + m_Options.add_option("", + "", + "boost-worker-count", + "Increase the number of worker threads - may cause computer to be less responsive", + cxxopts::value(m_BoostWorkerCount), + "<boostworkercount>"); + + m_Options.add_option("", + "", + "boost-worker-memory", + "Increase the limit where we write downloaded data to temporary storage to conserve space - may cause computer to " + "be less responsive due to high memory usage", + cxxopts::value(m_BoostWorkerMemory), + "<boostworkermemory>"); + + m_Options.add_option("", + "", + "boost-workers", + "Enables both 'boost-worker-count' and 'boost-worker-memory' - may cause computer to be less responsive", + cxxopts::value(m_BoostWorkers), + "<boostworkermemory>"); + m_Options.parse_positional({"project", "oplog"}); + m_Options.positional_help("[<projectid> <oplogid>]"); } ExportOplogCommand::~ExportOplogCommand() @@ -969,6 +992,12 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg return; } + if (m_BoostWorkers) + { + m_BoostWorkerCount = true; + m_BoostWorkerMemory = true; + } + m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) @@ -1152,6 +1181,15 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("ignoremissingattachments"sv, true); } + if (m_BoostWorkerCount) + { + Writer.AddBool("boostworkercount"sv, true); + } + if (m_BoostWorkerMemory) + { + Writer.AddBool("boostworkermemory"sv, true); + } + Writer.AddBool("async"sv, true); if (!m_FileDirectoryPath.empty()) { @@ -1402,6 +1440,28 @@ ImportOplogCommand::ImportOplogCommand() m_Options.add_option("", "", "plainprogress", "Use (legacy) plain progress update", cxxopts::value(m_PlainProgress), "<plainprogress>"); + m_Options.add_option("", + "", + "boost-worker-count", + "Increase the number of worker threads - may cause computer to be less responsive", + cxxopts::value(m_BoostWorkerCount), + "<boostworkercount>"); + + m_Options.add_option("", + "", + "boost-worker-memory", + "Increase the limit where we write downloaded data to temporary storage to conserve space - may cause computer to " + "be less responsive due to high memory usage", + cxxopts::value(m_BoostWorkerMemory), + "<boostworkermemory>"); + + m_Options.add_option("", + "", + "boost-workers", + "Enables both 'boost-worker-count' and 'boost-worker-memory' - may cause computer to be less responsive", + cxxopts::value(m_BoostWorkers), + "<boostworkermemory>"); + m_Options.parse_positional({"project", "oplog", "gcpath"}); m_Options.positional_help("[<projectid> <oplogid> [<gcpath>]]"); } @@ -1422,6 +1482,12 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg return; } + if (m_BoostWorkers) + { + m_BoostWorkerCount = true; + m_BoostWorkerMemory = true; + } + m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) @@ -1567,6 +1633,14 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("force"sv, true); } + if (m_BoostWorkerCount) + { + Writer.AddBool("boostworkercount"sv, true); + } + if (m_BoostWorkerMemory) + { + Writer.AddBool("boostworkermemory"sv, true); + } if (!m_FileDirectoryPath.empty()) { Writer.BeginObject("file"sv); @@ -2278,12 +2352,29 @@ OplogDownloadCommand::OplogDownloadCommand() m_Options.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); m_Options.add_option("", "", "force", "Force download and disregard local cache", cxxopts::value(m_ForceDownload), "<force>"); + m_Options.add_option("", "", - "boost-workers", + "boost-worker-count", "Increase the number of worker threads - may cause computer to be less responsive", - cxxopts::value(m_BoostWorkerThreads), - "<boostworkers>"); + cxxopts::value(m_BoostWorkerCount), + "<boostworkercount>"); + + m_Options.add_option("", + "", + "boost-worker-memory", + "Increase the limit where we write downloaded data to temporary storage to conserve space - may cause computer to " + "be less responsive due to high memory usage", + cxxopts::value(m_BoostWorkerMemory), + "<boostworkermemory>"); + + m_Options.add_option("", + "", + "boost-workers", + "Enables both 'boost-worker-count' and 'boost-worker-memory' - may cause computer to be less responsive", + cxxopts::value(m_BoostWorkers), + "<boostworkermemory>"); + m_Options.add_option("", "", "decompress", "Decompress downloaded attachment", cxxopts::value(m_DecompressAttachments), "<decompress>"); m_Options.add_option("", @@ -2420,9 +2511,15 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true); + if (m_BoostWorkers) + { + m_BoostWorkerCount = true; + m_BoostWorkerMemory = true; + } + std::unique_ptr<OperationLogOutput> OperationLogOutput(CreateConsoleLogOutput(ProgressMode)); - ThreadWorkers Workers(m_BoostWorkerThreads, /*SingleThreaded*/ false); + TransferThreadWorkers Workers(m_BoostWorkerCount, /*SingleThreaded*/ false); if (!m_Quiet) { ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); @@ -2461,8 +2558,9 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a StorageInstance Storage; - ClientSettings.AssumeHttp2 = ResolveRes.HostAssumeHttp2; - Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings); + ClientSettings.AssumeHttp2 = ResolveRes.HostAssumeHttp2; + ClientSettings.MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u; + Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings); BuildStorageCache::Statistics StorageCacheStats; @@ -2470,14 +2568,17 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a if (!ResolveRes.CacheUrl.empty()) { - Storage.CacheHttp = std::make_unique<HttpClient>(ResolveRes.CacheUrl, - HttpClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{3000}, - .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, - .AllowResume = true, - .RetryCount = 0}, - [&AbortFlag]() { return AbortFlag.load(); }); + Storage.CacheHttp = std::make_unique<HttpClient>( + ResolveRes.CacheUrl, + HttpClientSettings{ + .LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, + .AllowResume = true, + .RetryCount = 0, + .MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u}, + [&AbortFlag]() { return AbortFlag.load(); }); Storage.CacheName = ResolveRes.CacheName; } diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h index f80be5f71..56ef858f5 100644 --- a/src/zen/cmds/projectstore_cmd.h +++ b/src/zen/cmds/projectstore_cmd.h @@ -125,6 +125,9 @@ private: std::string m_BuildsMetadataPath; std::string m_BuildsMetadata; + bool m_BoostWorkerCount = false; + bool m_BoostWorkerMemory = false; + bool m_BoostWorkers = false; bool m_IgnoreMissingAttachments = false; std::string m_ZenUrl; @@ -186,6 +189,10 @@ private: std::string m_FileDirectoryPath; std::string m_FileName; + + bool m_BoostWorkerCount = false; + bool m_BoostWorkerMemory = false; + bool m_BoostWorkers = false; }; class SnapshotOplogCommand : public ProjectStoreCommand @@ -307,9 +314,11 @@ private: std::string m_Namespace; std::string m_Bucket; std::string m_BuildId; - bool m_ForceDownload = false; - bool m_BoostWorkerThreads = false; - bool m_UploadToZenCache = true; + bool m_ForceDownload = false; + bool m_BoostWorkerCount = false; + bool m_BoostWorkerMemory = false; + bool m_BoostWorkers = false; + bool m_UploadToZenCache = true; std::filesystem::path m_OplogOutputPath; bool m_DecompressAttachments = true; diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 223c668cd..eca654223 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -477,14 +477,18 @@ struct GenerateBlocksStatistics } }; +static constexpr size_t DefaultMaxChunkBlockSize = 64u * 1024u * 1024u; +static constexpr size_t DefaultMaxChunksPerChunkBlock = 4u * 1000u; +static constexpr size_t DefaultMaxChunkBlockEmbedSize = 3u * 512u * 1024u; + class BuildsOperationUploadFolder { public: struct ChunksBlockParameters { - size_t MaxBlockSize = 64u * 1024u * 1024u; - size_t MaxChunksPerBlock = 4u * 1000u; - size_t MaxChunkEmbedSize = 3u * 512u * 1024u; + size_t MaxBlockSize = DefaultMaxChunkBlockSize; + size_t MaxChunksPerBlock = DefaultMaxChunksPerChunkBlock; + size_t MaxChunkEmbedSize = DefaultMaxChunkBlockEmbedSize; }; struct Options diff --git a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h index bb26f55a0..e8b7c15c0 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h @@ -25,6 +25,7 @@ struct BuildsRemoteStoreOptions : RemoteStoreOptions bool AssumeHttp2 = false; bool PopulateCache = true; IoBuffer MetaData; + size_t MaximumInMemoryDownloadSize = 1024u * 1024u; }; std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(LoggerRef InLog, diff --git a/src/zen/threadworkers.h b/src/zenremotestore/include/zenremotestore/transferthreadworkers.h index bb80650a2..a7faacfd5 100644 --- a/src/zen/threadworkers.h +++ b/src/zenremotestore/include/zenremotestore/transferthreadworkers.h @@ -2,6 +2,7 @@ #pragma once +#include <zenbase/refcount.h> #include <zencore/timer.h> #include <zencore/zencore.h> @@ -12,10 +13,10 @@ namespace zen { class WorkerThreadPool; -class ThreadWorkers +class TransferThreadWorkers : public RefCounted { public: - ThreadWorkers(bool BoostWorkers, bool SingleThreaded); + TransferThreadWorkers(bool BoostWorkers, bool SingleThreaded); WorkerThreadPool& GetIOWorkerPool() { @@ -30,6 +31,9 @@ public: const std::string& GetWorkersInfo() const { return WorkersInfo; } + bool IsBoostWorkers() const { return BoostWorkers; } + bool IsSingleThreaded() const { return SingleThreaded; } + private: WorkerThreadPool* m_NetworkPool = nullptr; WorkerThreadPool* m_IOPool = nullptr; @@ -38,6 +42,8 @@ private: std::unique_ptr<WorkerThreadPool> m_IOPoolInstance; std::string WorkersInfo; + const bool BoostWorkers; + const bool SingleThreaded; }; } // namespace zen diff --git a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp index bd793b745..a8e883dde 100644 --- a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp @@ -690,24 +690,27 @@ CreateJupiterBuildsRemoteStore(LoggerRef InLog, ResolveBuildStorage(*Output, ClientSettings, Host, OverrideUrl, ZenHost, ZenCacheResolveMode::Discovery, /*Verbose*/ false); } - HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", - .ConnectTimeout = std::chrono::milliseconds(3000), - .Timeout = std::chrono::milliseconds(1800000), - .AccessTokenProvider = std::move(TokenProvider), - .AssumeHttp2 = ResolveRes.HostAssumeHttp2, - .AllowResume = true, - .RetryCount = 4}; + HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", + .ConnectTimeout = std::chrono::milliseconds(3000), + .Timeout = std::chrono::milliseconds(1800000), + .AccessTokenProvider = std::move(TokenProvider), + .AssumeHttp2 = ResolveRes.HostAssumeHttp2, + .AllowResume = true, + .RetryCount = 4, + .MaximumInMemoryDownloadSize = Options.MaximumInMemoryDownloadSize}; std::unique_ptr<HttpClientSettings> CacheClientSettings; if (!ResolveRes.CacheUrl.empty()) { - CacheClientSettings = std::make_unique<HttpClientSettings>(HttpClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{3000}, - .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, - .AllowResume = true, - .RetryCount = 0}); + CacheClientSettings = + std::make_unique<HttpClientSettings>(HttpClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, + .AllowResume = true, + .RetryCount = 0, + .MaximumInMemoryDownloadSize = Options.MaximumInMemoryDownloadSize}); } std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(InLog, diff --git a/src/zen/threadworkers.cpp b/src/zenremotestore/transferthreadworkers.cpp index 2220b9c95..4f863027d 100644 --- a/src/zen/threadworkers.cpp +++ b/src/zenremotestore/transferthreadworkers.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include "threadworkers.h" +#include <zenremotestore/transferthreadworkers.h> #include <zencore/intmath.h> #include <zencore/logging.h> @@ -11,7 +11,9 @@ namespace zen { -ThreadWorkers::ThreadWorkers(bool BoostWorkers, bool SingleThreaded) +TransferThreadWorkers::TransferThreadWorkers(bool InBoostWorkers, bool InSingleThreaded) +: BoostWorkers(InBoostWorkers) +, SingleThreaded(InSingleThreaded) { if (SingleThreaded) { diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp index 11231a203..4e947f221 100644 --- a/src/zenserver/storage/projectstore/httpprojectstore.cpp +++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp @@ -19,6 +19,7 @@ #include <zenremotestore/projectstore/jupiterremoteprojectstore.h> #include <zenremotestore/projectstore/remoteprojectstore.h> #include <zenremotestore/projectstore/zenremoteprojectstore.h> +#include <zenremotestore/transferthreadworkers.h> #include <zenstore/oplogreferencedset.h> #include <zenstore/projectstore.h> #include <zenstore/zenstore.h> @@ -250,6 +251,7 @@ namespace { AuthMgr& AuthManager, size_t MaxBlockSize, size_t MaxChunkEmbedSize, + size_t MaximumInMemoryDownloadSize, const std::filesystem::path& TempFilePath) { ZEN_MEMSCOPE(GetProjectHttpTag()); @@ -485,7 +487,8 @@ namespace { ForceDisableTempBlocks, AssumeHttp2, PopulateCache, - MetaData}; + MetaData, + MaximumInMemoryDownloadSize}; RemoteStore = CreateJupiterBuildsRemoteStore(Log(), Options, TempFilePath, @@ -515,10 +518,17 @@ namespace { : fmt::format("{}: {}", Result.Reason, Result.Text)}; } + static uint64_t GetMaxMemoryBufferSize(size_t MaxBlockSize, bool BoostWorkerMemory) + { + return BoostWorkerMemory ? (MaxBlockSize + 16u * 1024u) : 1024u * 1024u; + } + } // namespace ////////////////////////////////////////////////////////////////////////// +////////////////////////////////////////////////////////////////////////// + HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatusService& StatusService, @@ -2639,9 +2649,16 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); bool CleanOplog = Params["clean"].AsBool(false); + bool BoostWorkerCount = Params["boostworkercount"].AsBool(false); + bool BoostWorkerMemory = Params["boostworkermemory"sv].AsBool(false); - CreateRemoteStoreResult RemoteStoreResult = - CreateRemoteStore(Log(), Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath()); + CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Log(), + Params, + m_AuthMgr, + MaxBlockSize, + MaxChunkEmbedSize, + GetMaxMemoryBufferSize(MaxBlockSize, BoostWorkerMemory), + Oplog->TempPath()); if (RemoteStoreResult.Store == nullptr) { @@ -2658,14 +2675,19 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) Oplog, Force, IgnoreMissingAttachments, - CleanOplog](JobContext& Context) { + CleanOplog, + BoostWorkerCount](JobContext& Context) { Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", Oplog->GetOuterProjectIdentifier(), Oplog->OplogId(), ActualRemoteStore->GetInfo().Description)); - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + Ref<TransferThreadWorkers> Workers = GetThreadWorkers(BoostWorkerCount, /*SingleThreaded*/ false); + + WorkerThreadPool& WorkerPool = Workers->GetIOWorkerPool(); + WorkerThreadPool& NetworkWorkerPool = Workers->GetNetworkPool(); + + Context.ReportMessage(fmt::format("{}", Workers->GetWorkersInfo())); RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, @@ -2697,9 +2719,16 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); + bool BoostWorkerCount = Params["boostworkercount"].AsBool(false); + bool BoostWorkerMemory = Params["boostworkermemory"sv].AsBool(false); - CreateRemoteStoreResult RemoteStoreResult = - CreateRemoteStore(Log(), Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath()); + CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Log(), + Params, + m_AuthMgr, + MaxBlockSize, + MaxChunkEmbedSize, + GetMaxMemoryBufferSize(MaxBlockSize, BoostWorkerMemory), + Oplog->TempPath()); if (RemoteStoreResult.Store == nullptr) { @@ -2720,7 +2749,8 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) ChunkFileSizeLimit, EmbedLooseFile, Force, - IgnoreMissingAttachments](JobContext& Context) { + IgnoreMissingAttachments, + BoostWorkerCount](JobContext& Context) { Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", Project->Identifier, Oplog->OplogId(), @@ -2728,8 +2758,12 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize))); - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + Ref<TransferThreadWorkers> Workers = GetThreadWorkers(BoostWorkerCount, /*SingleThreaded*/ false); + + Context.ReportMessage(fmt::format("{}", Workers->GetWorkersInfo())); + + WorkerThreadPool& WorkerPool = Workers->GetIOWorkerPool(); + WorkerThreadPool& NetworkWorkerPool = Workers->GetNetworkPool(); RemoteProjectStore::Result Result = SaveOplog(m_CidStore, *ActualRemoteStore, @@ -3322,4 +3356,16 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req) } } +Ref<TransferThreadWorkers> +HttpProjectService::GetThreadWorkers(bool BoostWorkers, bool SingleThreaded) +{ + RwLock::ExclusiveLockScope _(m_ThreadWorkersLock); + if (m_ThreadWorkers && m_ThreadWorkers->IsBoostWorkers() == BoostWorkers && m_ThreadWorkers->IsSingleThreaded() == SingleThreaded) + { + return m_ThreadWorkers; + } + m_ThreadWorkers = new TransferThreadWorkers(BoostWorkers, SingleThreaded); + return m_ThreadWorkers; +} + } // namespace zen diff --git a/src/zenserver/storage/projectstore/httpprojectstore.h b/src/zenserver/storage/projectstore/httpprojectstore.h index f6fe63614..b742102a5 100644 --- a/src/zenserver/storage/projectstore/httpprojectstore.h +++ b/src/zenserver/storage/projectstore/httpprojectstore.h @@ -14,6 +14,7 @@ class AuthMgr; class JobQueue; class OpenProcessCache; class ProjectStore; +class TransferThreadWorkers; ////////////////////////////////////////////////////////////////////////// // @@ -95,17 +96,21 @@ private: inline LoggerRef Log() { return m_Log; } - LoggerRef m_Log; - CidStore& m_CidStore; - HttpRequestRouter m_Router; - Ref<ProjectStore> m_ProjectStore; - HttpStatusService& m_StatusService; - HttpStatsService& m_StatsService; - AuthMgr& m_AuthMgr; - OpenProcessCache& m_OpenProcessCache; - JobQueue& m_JobQueue; - ProjectStats m_ProjectStats; - metrics::OperationTiming m_HttpRequests; + LoggerRef m_Log; + CidStore& m_CidStore; + HttpRequestRouter m_Router; + Ref<ProjectStore> m_ProjectStore; + HttpStatusService& m_StatusService; + HttpStatsService& m_StatsService; + AuthMgr& m_AuthMgr; + OpenProcessCache& m_OpenProcessCache; + JobQueue& m_JobQueue; + ProjectStats m_ProjectStats; + metrics::OperationTiming m_HttpRequests; + RwLock m_ThreadWorkersLock; + Ref<TransferThreadWorkers> m_ThreadWorkers; + + Ref<TransferThreadWorkers> GetThreadWorkers(bool BoostWorkers, bool SingleThreaded); }; } // namespace zen |