aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/include
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2025-07-29 23:04:15 +0000
committerLiam Mitchell <[email protected]>2025-07-29 23:04:15 +0000
commitbf0039cbab6dc21ce09c15be60878ee4208d8723 (patch)
tree553353471925c72459b91563ccceb17accd51ec3 /src/zenutil/include
parentAlways upload vcpkg logs on failure (diff)
parent5.6.14 (diff)
downloadzen-bf0039cbab6dc21ce09c15be60878ee4208d8723.tar.xz
zen-bf0039cbab6dc21ce09c15be60878ee4208d8723.zip
Merge branch 'main' into de/zen-service-command
Diffstat (limited to 'src/zenutil/include')
-rw-r--r--src/zenutil/include/zenutil/bufferedwritefilecache.h106
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h27
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h57
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h57
-rw-r--r--src/zenutil/include/zenutil/chunkingcontroller.h41
-rw-r--r--src/zenutil/include/zenutil/commandlineoptions.h29
-rw-r--r--src/zenutil/include/zenutil/environmentoptions.h92
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h1
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h28
-rw-r--r--src/zenutil/include/zenutil/logging/rotatingfilesink.h1
-rw-r--r--src/zenutil/include/zenutil/parallellwork.h117
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h77
-rw-r--r--src/zenutil/include/zenutil/workerpools.h3
13 files changed, 462 insertions, 174 deletions
diff --git a/src/zenutil/include/zenutil/bufferedwritefilecache.h b/src/zenutil/include/zenutil/bufferedwritefilecache.h
new file mode 100644
index 000000000..68d6c375e
--- /dev/null
+++ b/src/zenutil/include/zenutil/bufferedwritefilecache.h
@@ -0,0 +1,106 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/basicfile.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+class CompositeBuffer;
+
+class BufferedWriteFileCache
+{
+public:
+ BufferedWriteFileCache(const BufferedWriteFileCache&) = delete;
+ BufferedWriteFileCache& operator=(const BufferedWriteFileCache&) = delete;
+
+ BufferedWriteFileCache();
+
+ ~BufferedWriteFileCache();
+
+ std::unique_ptr<BasicFile> Get(uint32_t FileIndex);
+
+ void Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer);
+
+ void Close(std::span<uint32_t> FileIndexes);
+
+ class Local
+ {
+ public:
+ struct Writer
+ {
+ std::unique_ptr<BasicFile> File;
+ std::unique_ptr<BasicFileWriter> Writer;
+
+ inline void Write(const CompositeBuffer& Chunk, uint64_t FileOffset)
+ {
+ if (Writer)
+ {
+ Writer->Write(Chunk, FileOffset);
+ }
+ else
+ {
+ File->Write(Chunk, FileOffset);
+ }
+ }
+ };
+
+ Local(const Local&) = delete;
+ Local& operator=(const Local&) = delete;
+
+ explicit Local(BufferedWriteFileCache& Cache);
+ ~Local();
+
+ Writer* GetWriter(uint32_t FileIndex);
+ Writer* PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer);
+
+ private:
+ tsl::robin_map<uint32_t, uint32_t> m_FileIndexToWriterIndex;
+ std::vector<std::unique_ptr<Writer>> m_ChunkWriters;
+ BufferedWriteFileCache& m_Cache;
+ };
+
+private:
+ static constexpr size_t MaxHandlesPerPath = 7;
+ static constexpr size_t MaxBufferedCount = 1024;
+ struct TOpenHandles
+ {
+ BasicFile* Files[MaxHandlesPerPath];
+ uint64_t Size = 0;
+ inline BasicFile* Pop()
+ {
+ if (Size > 0)
+ {
+ return Files[--Size];
+ }
+ else
+ {
+ return nullptr;
+ }
+ }
+ inline bool Push(BasicFile* File)
+ {
+ if (Size < MaxHandlesPerPath)
+ {
+ Files[Size++] = File;
+ return true;
+ }
+ return false;
+ }
+ };
+ static_assert(sizeof(TOpenHandles) == 64);
+
+ RwLock m_WriterLock;
+ tsl::robin_map<uint32_t, uint32_t> m_ChunkWriters;
+ std::vector<TOpenHandles> m_OpenFiles;
+ std::atomic<uint32_t> m_CacheHitCount;
+ std::atomic<uint32_t> m_CacheMissCount;
+ std::atomic<uint32_t> m_OpenHandleCount;
+ std::atomic<uint32_t> m_DroppedHandleCount;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index 9d2bab170..f49d4b42a 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -5,6 +5,10 @@
#include <zencore/compactbinary.h>
#include <zenutil/chunkblock.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace zen {
class BuildStorage
@@ -21,6 +25,7 @@ public:
virtual ~BuildStorage() {}
+ virtual CbObject ListNamespaces(bool bRecursive = false) = 0;
virtual CbObject ListBuilds(CbObject Query) = 0;
virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) = 0;
virtual CbObject GetBuild(const Oid& BuildId) = 0;
@@ -43,16 +48,18 @@ public:
virtual IoBuffer GetBuildBlob(const Oid& BuildId,
const IoHash& RawHash,
uint64_t RangeOffset = 0,
- uint64_t RangeBytes = (uint64_t)-1) = 0;
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(
- const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0;
-
- virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0;
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) = 0;
+
+ [[nodiscard]] virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0;
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
+
+ virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0;
};
} // namespace zen
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
new file mode 100644
index 000000000..e1fb73fd4
--- /dev/null
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -0,0 +1,57 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compositebuffer.h>
+#include <zenutil/chunkblock.h>
+
+namespace zen {
+
+class HttpClient;
+
+class BuildStorageCache
+{
+public:
+ struct Statistics
+ {
+ std::atomic<uint64_t> TotalBytesRead = 0;
+ std::atomic<uint64_t> TotalBytesWritten = 0;
+ std::atomic<uint64_t> TotalRequestCount = 0;
+ std::atomic<uint64_t> TotalRequestTimeUs = 0;
+ std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ };
+
+ virtual ~BuildStorageCache() {}
+
+ virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0;
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t RangeOffset = 0,
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+
+ virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0;
+ virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ struct BlobExistsResult
+ {
+ bool HasBody = 0;
+ bool HasMetadata = 0;
+ };
+
+ virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ virtual void Flush(
+ int32_t UpdateIntervalMS,
+ std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0;
+};
+
+std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount);
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index 57b55cb8e..306a5d990 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats,
std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory,
std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile,
WorkerThreadPool& WorkerPool,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag);
@@ -94,10 +94,31 @@ struct ChunkedFolderContent
ChunkedContentData ChunkedContent;
};
+struct ChunkedContentLookup
+{
+ struct ChunkSequenceLocation
+ {
+ uint32_t SequenceIndex = (uint32_t)-1;
+ uint64_t Offset = (uint64_t)-1;
+ };
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex;
+ std::vector<uint32_t> SequenceIndexChunkOrderOffset;
+ std::vector<ChunkSequenceLocation> ChunkSequenceLocations;
+ std::vector<size_t>
+ ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex
+ std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex
+ std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash
+ std::vector<uint32_t> PathExtensionHash;
+};
+
void SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbWriter& Output);
ChunkedFolderContent LoadChunkedFolderContentToCompactBinary(CbObjectView Input);
ChunkedFolderContent MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const ChunkedFolderContent> Overlays);
+ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base,
+ const ChunkedContentLookup& BaseContentLookup,
+ std::span<const std::filesystem::path> DeletedPaths);
ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base, std::span<const std::filesystem::path> DeletedPaths);
struct ChunkingStatistics
@@ -111,31 +132,15 @@ struct ChunkingStatistics
uint64_t ElapsedWallTimeUS = 0;
};
-ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
- WorkerThreadPool& WorkerPool,
- const std::filesystem::path& RootPath,
- const FolderContent& Content,
- const ChunkingController& InChunkingController,
- int32_t UpdateInteralMS,
- std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
- std::atomic<bool>& AbortFlag);
-
-struct ChunkedContentLookup
-{
- struct ChunkSequenceLocation
- {
- uint32_t SequenceIndex = (uint32_t)-1;
- uint64_t Offset = (uint64_t)-1;
- };
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex;
- std::vector<uint32_t> SequenceIndexChunkOrderOffset;
- std::vector<ChunkSequenceLocation> ChunkSequenceLocations;
- std::vector<size_t>
- ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex
- std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex
- std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash
-};
+ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& RootPath,
+ const FolderContent& Content,
+ const ChunkingController& InChunkingController,
+ int32_t UpdateIntervalMS,
+ std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag);
ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content);
diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h
index 246f4498a..315502265 100644
--- a/src/zenutil/include/zenutil/chunkingcontroller.h
+++ b/src/zenutil/include/zenutil/chunkingcontroller.h
@@ -11,7 +11,11 @@
namespace zen {
-const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"};
+const std::vector<std::string> DefaultChunkingExcludeExtensions =
+ {".exe", ".dll", ".pdb", ".self", ".mp4", ".zip", ".7z", ".bzip", ".rar", ".gzip"};
+const std::vector<std::string> DefaultFixedChunkingExtensions = {".apk", ".nsp", ".xvc", ".pkg", ".dmg", ".ipa"};
+const bool DefaultChunkingExcludeElfFiles = true;
+const bool DefaultChunkingExcludeMachOFiles = true;
const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u,
.MaxSize = 128u * 1024u,
@@ -19,7 +23,8 @@ const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128
const size_t DefaultChunkingFileSizeLimit = DefaultChunkedParams.MaxSize;
-const uint32_t DefaultFixedChunkingChunkSize = 16u * 1024u * 1024u;
+const uint64_t DefaultFixedChunkingChunkSize = 32u * 1024u * 1024u;
+const uint64_t DefaultMinSizeForFixedChunking = DefaultFixedChunkingChunkSize * 8u;
struct ChunkedInfoWithSource;
@@ -38,17 +43,31 @@ public:
virtual CbObject GetParameters() const = 0;
};
-std::unique_ptr<ChunkingController> CreateBasicChunkingController(
- std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
- uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
- const ChunkedParams& ChunkingParams = DefaultChunkedParams);
+struct BasicChunkingControllerSettings
+{
+ std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions;
+ bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles;
+ bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles;
+ uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit;
+ ChunkedParams ChunkingParams = DefaultChunkedParams;
+};
+
+std::unique_ptr<ChunkingController> CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings);
std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters);
-std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(
- std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
- uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
- const ChunkedParams& ChunkingParams = DefaultChunkedParams,
- uint32_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize);
+struct ChunkingControllerWithFixedChunkingSettings
+{
+ std::vector<std::string> FixedChunkingExtensions = DefaultFixedChunkingExtensions;
+ std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions;
+ bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles;
+ bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles;
+ uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit;
+ ChunkedParams ChunkingParams = DefaultChunkedParams;
+ uint64_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize;
+ uint64_t MinSizeForFixedChunking = DefaultMinSizeForFixedChunking;
+};
+
+std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting);
std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters);
std::unique_ptr<ChunkingController> CreateChunkingController(std::string_view Name, CbObjectView Parameters);
diff --git a/src/zenutil/include/zenutil/commandlineoptions.h b/src/zenutil/include/zenutil/commandlineoptions.h
new file mode 100644
index 000000000..f927d41e5
--- /dev/null
+++ b/src/zenutil/include/zenutil/commandlineoptions.h
@@ -0,0 +1,29 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+#include <filesystem>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+
+namespace cxxopts::values {
+// We declare this specialization before including cxxopts to make it stick
+void parse_value(const std::string& text, std::filesystem::path& value);
+} // namespace cxxopts::values
+
+#include <cxxopts.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+std::vector<std::string> ParseCommandLine(std::string_view CommandLine);
+std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs);
+void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path);
+[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path);
+std::filesystem::path StringToPath(const std::string_view& Path);
+std::string_view RemoveQuotes(const std::string_view& Arg);
+
+void commandlineoptions_forcelink(); // internal
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/environmentoptions.h b/src/zenutil/include/zenutil/environmentoptions.h
new file mode 100644
index 000000000..7418608e4
--- /dev/null
+++ b/src/zenutil/include/zenutil/environmentoptions.h
@@ -0,0 +1,92 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/string.h>
+#include <zenutil/commandlineoptions.h>
+
+namespace zen {
+
+class EnvironmentOptions
+{
+public:
+ class OptionValue
+ {
+ public:
+ virtual void Parse(std::string_view Value) = 0;
+
+ virtual ~OptionValue() {}
+ };
+
+ class StringOption : public OptionValue
+ {
+ public:
+ explicit StringOption(std::string& Value);
+ virtual void Parse(std::string_view Value) override;
+ std::string& RefValue;
+ };
+
+ class FilePathOption : public OptionValue
+ {
+ public:
+ explicit FilePathOption(std::filesystem::path& Value);
+ virtual void Parse(std::string_view Value) override;
+ std::filesystem::path& RefValue;
+ };
+
+ class BoolOption : public OptionValue
+ {
+ public:
+ explicit BoolOption(bool& Value);
+ virtual void Parse(std::string_view Value);
+ bool& RefValue;
+ };
+
+ template<Integral T>
+ class NumberOption : public OptionValue
+ {
+ public:
+ explicit NumberOption(T& Value) : RefValue(Value) {}
+ virtual void Parse(std::string_view Value) override
+ {
+ if (std::optional<T> OptionalValue = ParseInt<T>(Value); OptionalValue.has_value())
+ {
+ RefValue = OptionalValue.value();
+ }
+ }
+ T& RefValue;
+ };
+
+ struct Option
+ {
+ std::string CommandLineOptionName;
+ std::shared_ptr<OptionValue> Value;
+ };
+
+ std::shared_ptr<OptionValue> MakeOption(std::string& Value);
+ std::shared_ptr<OptionValue> MakeOption(std::filesystem::path& Value);
+
+ template<Integral T>
+ std::shared_ptr<OptionValue> MakeOption(T& Value)
+ {
+ return std::make_shared<NumberOption<T>>(Value);
+ };
+
+ std::shared_ptr<OptionValue> MakeOption(bool& Value);
+
+ template<typename T>
+ void AddOption(std::string_view EnvName, T& Value, std::string_view CommandLineOptionName = "")
+ {
+ OptionMap.insert_or_assign(std::string(EnvName),
+ Option{.CommandLineOptionName = std::string(CommandLineOptionName), .Value = MakeOption(Value)});
+ };
+
+ EnvironmentOptions();
+
+ void Parse(const cxxopts::ParseResult& CmdLineResult);
+
+private:
+ std::unordered_map<std::string, Option> OptionMap;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
index 89fc70140..bbf070993 100644
--- a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
+++ b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
@@ -13,5 +13,6 @@ std::unique_ptr<BuildStorage> CreateJupiterBuildStorage(LoggerRef InLog,
BuildStorage::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
+ bool AllowRedirect,
const std::filesystem::path& TempFolderPath);
} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 2c5fc73b8..b79790f25 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -65,7 +65,7 @@ struct FinalizeBuildPartResult : JupiterResult
class JupiterSession
{
public:
- JupiterSession(LoggerRef InLog, HttpClient& InHttpClient);
+ JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect);
~JupiterSession();
JupiterResult Authenticate();
@@ -102,6 +102,8 @@ public:
std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+ JupiterResult ListBuildNamespaces();
+ JupiterResult ListBuildBuckets(std::string_view Namespace);
JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload);
JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
@@ -135,13 +137,14 @@ public:
uint64_t PayloadSize,
std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems);
- JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
- std::vector<std::function<JupiterResult()>>& OutWorkItems);
+ JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems);
JupiterResult PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
@@ -152,9 +155,15 @@ public:
const Oid& BuildId,
const Oid& PartId,
const IoHash& RawHash);
- JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount);
JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload);
+ JupiterResult PutBuildPartStats(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ IoBuffer Payload);
+
private:
inline LoggerRef Log() { return m_Log; }
@@ -164,6 +173,7 @@ private:
LoggerRef m_Log;
HttpClient& m_HttpClient;
+ const bool m_AllowRedirect = false;
};
} // namespace zen
diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
index 758722156..cd28bdcb2 100644
--- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h
+++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
@@ -27,7 +27,6 @@ public:
{
ZEN_MEMSCOPE(ELLMTag::Logging);
- ZEN_MEMSCOPE(ELLMTag::Logging);
std::error_code Ec;
if (RotateOnOpen)
{
diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h
deleted file mode 100644
index 79798fc8d..000000000
--- a/src/zenutil/include/zenutil/parallellwork.h
+++ /dev/null
@@ -1,117 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/thread.h>
-#include <zencore/workthreadpool.h>
-
-#include <atomic>
-
-namespace zen {
-
-class ParallellWork
-{
-public:
- ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {}
-
- ~ParallellWork()
- {
- // Make sure to call Wait before destroying
- ZEN_ASSERT(m_PendingWork.Remaining() == 0);
- }
-
- std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction()
- {
- return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) {
- m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); });
- AbortFlag = true;
- };
- }
-
- void ScheduleWork(WorkerThreadPool& WorkerPool,
- std::function<void(std::atomic<bool>& AbortFlag)>&& Work,
- std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError)
- {
- m_PendingWork.AddCount(1);
- try
- {
- WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] {
- try
- {
- Work(m_AbortFlag);
- }
- catch (const AssertException& AssertEx)
- {
- OnError(
- std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())),
- m_AbortFlag);
- }
- catch (const std::system_error& SystemError)
- {
- if (IsOOM(SystemError.code()))
- {
- OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- else if (IsOOD(SystemError.code()))
- {
- OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- else
- {
- OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- }
- catch (const std::exception& Ex)
- {
- OnError(Ex, m_AbortFlag);
- }
- m_PendingWork.CountDown();
- });
- }
- catch (const std::exception&)
- {
- m_PendingWork.CountDown();
- throw;
- }
- }
-
- void Abort() { m_AbortFlag = true; }
-
- bool IsAborted() const { return m_AbortFlag.load(); }
-
- void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback)
- {
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- m_PendingWork.CountDown();
- while (!m_PendingWork.Wait(UpdateInteralMS))
- {
- UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
- }
- if (m_Errors.size() == 1)
- {
- throw std::runtime_error(m_Errors.front());
- }
- else if (m_Errors.size() > 1)
- {
- ExtendableStringBuilder<128> SB;
- SB.Append("Multiple errors:");
- for (const std::string& Error : m_Errors)
- {
- SB.Append(fmt::format("\n {}", Error));
- }
- throw std::runtime_error(SB.ToString());
- }
- }
- Latch& PendingWork() { return m_PendingWork; }
-
-private:
- std::atomic<bool>& m_AbortFlag;
- Latch m_PendingWork;
-
- RwLock m_ErrorLock;
- std::vector<std::string> m_Errors;
-};
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
new file mode 100644
index 000000000..639c6968c
--- /dev/null
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -0,0 +1,77 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/workthreadpool.h>
+
+#include <atomic>
+
+namespace zen {
+
+class ParallelWork
+{
+public:
+ ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag);
+
+ ~ParallelWork();
+
+ typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
+ typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
+ typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback;
+
+ void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {})
+ {
+ m_PendingWork.AddCount(1);
+ try
+ {
+ WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
+ auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
+ try
+ {
+ while (m_PauseFlag && !m_AbortFlag)
+ {
+ Sleep(2000);
+ }
+ Work(m_AbortFlag);
+ }
+ catch (...)
+ {
+ OnError(std::current_exception(), m_AbortFlag);
+ }
+ });
+ }
+ catch (const std::exception&)
+ {
+ m_PendingWork.CountDown();
+ throw;
+ }
+ }
+
+ void Abort() { m_AbortFlag = true; }
+
+ bool IsAborted() const { return m_AbortFlag.load(); }
+
+ void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback);
+
+ void Wait();
+
+ Latch& PendingWork() { return m_PendingWork; }
+
+private:
+ ExceptionCallback DefaultErrorFunction();
+ void RethrowErrors();
+
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ bool m_DispatchComplete = false;
+ Latch m_PendingWork;
+
+ RwLock m_ErrorLock;
+ std::vector<std::exception_ptr> m_Errors;
+};
+
+void parallellwork_forcelink();
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
index 9683ad720..df2033bca 100644
--- a/src/zenutil/include/zenutil/workerpools.h
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType);
// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread
WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType);
+// Worker pool with minimum number of worker threads, but at least one thread
+WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType);
+
// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread
// This is useful for debugging when multiple async thread can make stepping in debugger complicated
WorkerThreadPool& GetSyncWorkerPool();