diff options
| author | Liam Mitchell <[email protected]> | 2025-07-29 23:04:15 +0000 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2025-07-29 23:04:15 +0000 |
| commit | bf0039cbab6dc21ce09c15be60878ee4208d8723 (patch) | |
| tree | 553353471925c72459b91563ccceb17accd51ec3 /src/zenutil/include | |
| parent | Always upload vcpkg logs on failure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bf0039cbab6dc21ce09c15be60878ee4208d8723.tar.xz zen-bf0039cbab6dc21ce09c15be60878ee4208d8723.zip | |
Merge branch 'main' into de/zen-service-command
Diffstat (limited to 'src/zenutil/include')
| -rw-r--r-- | src/zenutil/include/zenutil/bufferedwritefilecache.h | 106 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstorage.h | 27 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 57 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedcontent.h | 57 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkingcontroller.h | 41 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/commandlineoptions.h | 29 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/environmentoptions.h | 92 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h | 1 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupitersession.h | 28 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/logging/rotatingfilesink.h | 1 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallellwork.h | 117 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 77 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/workerpools.h | 3 |
13 files changed, 462 insertions, 174 deletions
diff --git a/src/zenutil/include/zenutil/bufferedwritefilecache.h b/src/zenutil/include/zenutil/bufferedwritefilecache.h new file mode 100644 index 000000000..68d6c375e --- /dev/null +++ b/src/zenutil/include/zenutil/bufferedwritefilecache.h @@ -0,0 +1,106 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/basicfile.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +class CompositeBuffer; + +class BufferedWriteFileCache +{ +public: + BufferedWriteFileCache(const BufferedWriteFileCache&) = delete; + BufferedWriteFileCache& operator=(const BufferedWriteFileCache&) = delete; + + BufferedWriteFileCache(); + + ~BufferedWriteFileCache(); + + std::unique_ptr<BasicFile> Get(uint32_t FileIndex); + + void Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer); + + void Close(std::span<uint32_t> FileIndexes); + + class Local + { + public: + struct Writer + { + std::unique_ptr<BasicFile> File; + std::unique_ptr<BasicFileWriter> Writer; + + inline void Write(const CompositeBuffer& Chunk, uint64_t FileOffset) + { + if (Writer) + { + Writer->Write(Chunk, FileOffset); + } + else + { + File->Write(Chunk, FileOffset); + } + } + }; + + Local(const Local&) = delete; + Local& operator=(const Local&) = delete; + + explicit Local(BufferedWriteFileCache& Cache); + ~Local(); + + Writer* GetWriter(uint32_t FileIndex); + Writer* PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer); + + private: + tsl::robin_map<uint32_t, uint32_t> m_FileIndexToWriterIndex; + std::vector<std::unique_ptr<Writer>> m_ChunkWriters; + BufferedWriteFileCache& m_Cache; + }; + +private: + static constexpr size_t MaxHandlesPerPath = 7; + static constexpr size_t MaxBufferedCount = 1024; + struct TOpenHandles + { + BasicFile* Files[MaxHandlesPerPath]; + uint64_t Size = 0; + inline BasicFile* Pop() + { + if (Size > 0) + { + return Files[--Size]; + } + else + { + return nullptr; + } + } + inline bool Push(BasicFile* File) + { + if (Size < MaxHandlesPerPath) + { + Files[Size++] = File; + return true; + } + return false; + } + }; + static_assert(sizeof(TOpenHandles) == 64); + + RwLock m_WriterLock; + tsl::robin_map<uint32_t, uint32_t> m_ChunkWriters; + std::vector<TOpenHandles> m_OpenFiles; + std::atomic<uint32_t> m_CacheHitCount; + std::atomic<uint32_t> m_CacheMissCount; + std::atomic<uint32_t> m_OpenHandleCount; + std::atomic<uint32_t> m_DroppedHandleCount; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index 9d2bab170..f49d4b42a 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -5,6 +5,10 @@ #include <zencore/compactbinary.h> #include <zenutil/chunkblock.h> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace zen { class BuildStorage @@ -21,6 +25,7 @@ public: virtual ~BuildStorage() {} + virtual CbObject ListNamespaces(bool bRecursive = false) = 0; virtual CbObject ListBuilds(CbObject Query) = 0; virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) = 0; virtual CbObject GetBuild(const Oid& BuildId) = 0; @@ -43,16 +48,18 @@ public: virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset = 0, - uint64_t RangeBytes = (uint64_t)-1) = 0; - virtual std::vector<std::function<void()>> GetLargeBuildBlob( - const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0; - - virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0; - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; + uint64_t RangeBytes = (uint64_t)-1) = 0; + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) = 0; + + [[nodiscard]] virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0; + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; + + virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0; }; } // namespace zen diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h new file mode 100644 index 000000000..e1fb73fd4 --- /dev/null +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -0,0 +1,57 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> + +#include <zencore/compactbinary.h> +#include <zencore/compositebuffer.h> +#include <zenutil/chunkblock.h> + +namespace zen { + +class HttpClient; + +class BuildStorageCache +{ +public: + struct Statistics + { + std::atomic<uint64_t> TotalBytesRead = 0; + std::atomic<uint64_t> TotalBytesWritten = 0; + std::atomic<uint64_t> TotalRequestCount = 0; + std::atomic<uint64_t> TotalRequestTimeUs = 0; + std::atomic<uint64_t> TotalExecutionTimeUs = 0; + }; + + virtual ~BuildStorageCache() {} + + virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0; + virtual IoBuffer GetBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t RangeOffset = 0, + uint64_t RangeBytes = (uint64_t)-1) = 0; + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0; + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + struct BlobExistsResult + { + bool HasBody = 0; + bool HasMetadata = 0; + }; + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + virtual void Flush( + int32_t UpdateIntervalMS, + std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; +}; + +std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount); +} // namespace zen diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 57b55cb8e..306a5d990 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats, std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag); @@ -94,10 +94,31 @@ struct ChunkedFolderContent ChunkedContentData ChunkedContent; }; +struct ChunkedContentLookup +{ + struct ChunkSequenceLocation + { + uint32_t SequenceIndex = (uint32_t)-1; + uint64_t Offset = (uint64_t)-1; + }; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex; + std::vector<uint32_t> SequenceIndexChunkOrderOffset; + std::vector<ChunkSequenceLocation> ChunkSequenceLocations; + std::vector<size_t> + ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex + std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex + std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash + std::vector<uint32_t> PathExtensionHash; +}; + void SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbWriter& Output); ChunkedFolderContent LoadChunkedFolderContentToCompactBinary(CbObjectView Input); ChunkedFolderContent MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const ChunkedFolderContent> Overlays); +ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base, + const ChunkedContentLookup& BaseContentLookup, + std::span<const std::filesystem::path> DeletedPaths); ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base, std::span<const std::filesystem::path> DeletedPaths); struct ChunkingStatistics @@ -111,31 +132,15 @@ struct ChunkingStatistics uint64_t ElapsedWallTimeUS = 0; }; -ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& RootPath, - const FolderContent& Content, - const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, - std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, - std::atomic<bool>& AbortFlag); - -struct ChunkedContentLookup -{ - struct ChunkSequenceLocation - { - uint32_t SequenceIndex = (uint32_t)-1; - uint64_t Offset = (uint64_t)-1; - }; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex; - std::vector<uint32_t> SequenceIndexChunkOrderOffset; - std::vector<ChunkSequenceLocation> ChunkSequenceLocations; - std::vector<size_t> - ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex - std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex - std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash -}; +ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& RootPath, + const FolderContent& Content, + const ChunkingController& InChunkingController, + int32_t UpdateIntervalMS, + std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag); ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content); diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h index 246f4498a..315502265 100644 --- a/src/zenutil/include/zenutil/chunkingcontroller.h +++ b/src/zenutil/include/zenutil/chunkingcontroller.h @@ -11,7 +11,11 @@ namespace zen { -const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"}; +const std::vector<std::string> DefaultChunkingExcludeExtensions = + {".exe", ".dll", ".pdb", ".self", ".mp4", ".zip", ".7z", ".bzip", ".rar", ".gzip"}; +const std::vector<std::string> DefaultFixedChunkingExtensions = {".apk", ".nsp", ".xvc", ".pkg", ".dmg", ".ipa"}; +const bool DefaultChunkingExcludeElfFiles = true; +const bool DefaultChunkingExcludeMachOFiles = true; const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u, .MaxSize = 128u * 1024u, @@ -19,7 +23,8 @@ const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128 const size_t DefaultChunkingFileSizeLimit = DefaultChunkedParams.MaxSize; -const uint32_t DefaultFixedChunkingChunkSize = 16u * 1024u * 1024u; +const uint64_t DefaultFixedChunkingChunkSize = 32u * 1024u * 1024u; +const uint64_t DefaultMinSizeForFixedChunking = DefaultFixedChunkingChunkSize * 8u; struct ChunkedInfoWithSource; @@ -38,17 +43,31 @@ public: virtual CbObject GetParameters() const = 0; }; -std::unique_ptr<ChunkingController> CreateBasicChunkingController( - std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions, - uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit, - const ChunkedParams& ChunkingParams = DefaultChunkedParams); +struct BasicChunkingControllerSettings +{ + std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions; + bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles; + bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles; + uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit; + ChunkedParams ChunkingParams = DefaultChunkedParams; +}; + +std::unique_ptr<ChunkingController> CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings); std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters); -std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking( - std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions, - uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit, - const ChunkedParams& ChunkingParams = DefaultChunkedParams, - uint32_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize); +struct ChunkingControllerWithFixedChunkingSettings +{ + std::vector<std::string> FixedChunkingExtensions = DefaultFixedChunkingExtensions; + std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions; + bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles; + bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles; + uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit; + ChunkedParams ChunkingParams = DefaultChunkedParams; + uint64_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize; + uint64_t MinSizeForFixedChunking = DefaultMinSizeForFixedChunking; +}; + +std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting); std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters); std::unique_ptr<ChunkingController> CreateChunkingController(std::string_view Name, CbObjectView Parameters); diff --git a/src/zenutil/include/zenutil/commandlineoptions.h b/src/zenutil/include/zenutil/commandlineoptions.h new file mode 100644 index 000000000..f927d41e5 --- /dev/null +++ b/src/zenutil/include/zenutil/commandlineoptions.h @@ -0,0 +1,29 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> +#include <filesystem> + +ZEN_THIRD_PARTY_INCLUDES_START + +namespace cxxopts::values { +// We declare this specialization before including cxxopts to make it stick +void parse_value(const std::string& text, std::filesystem::path& value); +} // namespace cxxopts::values + +#include <cxxopts.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +std::vector<std::string> ParseCommandLine(std::string_view CommandLine); +std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs); +void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path); +[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path); +std::filesystem::path StringToPath(const std::string_view& Path); +std::string_view RemoveQuotes(const std::string_view& Arg); + +void commandlineoptions_forcelink(); // internal + +} // namespace zen diff --git a/src/zenutil/include/zenutil/environmentoptions.h b/src/zenutil/include/zenutil/environmentoptions.h new file mode 100644 index 000000000..7418608e4 --- /dev/null +++ b/src/zenutil/include/zenutil/environmentoptions.h @@ -0,0 +1,92 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/string.h> +#include <zenutil/commandlineoptions.h> + +namespace zen { + +class EnvironmentOptions +{ +public: + class OptionValue + { + public: + virtual void Parse(std::string_view Value) = 0; + + virtual ~OptionValue() {} + }; + + class StringOption : public OptionValue + { + public: + explicit StringOption(std::string& Value); + virtual void Parse(std::string_view Value) override; + std::string& RefValue; + }; + + class FilePathOption : public OptionValue + { + public: + explicit FilePathOption(std::filesystem::path& Value); + virtual void Parse(std::string_view Value) override; + std::filesystem::path& RefValue; + }; + + class BoolOption : public OptionValue + { + public: + explicit BoolOption(bool& Value); + virtual void Parse(std::string_view Value); + bool& RefValue; + }; + + template<Integral T> + class NumberOption : public OptionValue + { + public: + explicit NumberOption(T& Value) : RefValue(Value) {} + virtual void Parse(std::string_view Value) override + { + if (std::optional<T> OptionalValue = ParseInt<T>(Value); OptionalValue.has_value()) + { + RefValue = OptionalValue.value(); + } + } + T& RefValue; + }; + + struct Option + { + std::string CommandLineOptionName; + std::shared_ptr<OptionValue> Value; + }; + + std::shared_ptr<OptionValue> MakeOption(std::string& Value); + std::shared_ptr<OptionValue> MakeOption(std::filesystem::path& Value); + + template<Integral T> + std::shared_ptr<OptionValue> MakeOption(T& Value) + { + return std::make_shared<NumberOption<T>>(Value); + }; + + std::shared_ptr<OptionValue> MakeOption(bool& Value); + + template<typename T> + void AddOption(std::string_view EnvName, T& Value, std::string_view CommandLineOptionName = "") + { + OptionMap.insert_or_assign(std::string(EnvName), + Option{.CommandLineOptionName = std::string(CommandLineOptionName), .Value = MakeOption(Value)}); + }; + + EnvironmentOptions(); + + void Parse(const cxxopts::ParseResult& CmdLineResult); + +private: + std::unordered_map<std::string, Option> OptionMap; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h index 89fc70140..bbf070993 100644 --- a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h +++ b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h @@ -13,5 +13,6 @@ std::unique_ptr<BuildStorage> CreateJupiterBuildStorage(LoggerRef InLog, BuildStorage::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, + bool AllowRedirect, const std::filesystem::path& TempFolderPath); } // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h index 2c5fc73b8..b79790f25 100644 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -65,7 +65,7 @@ struct FinalizeBuildPartResult : JupiterResult class JupiterSession { public: - JupiterSession(LoggerRef InLog, HttpClient& InHttpClient); + JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect); ~JupiterSession(); JupiterResult Authenticate(); @@ -102,6 +102,8 @@ public: std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + JupiterResult ListBuildNamespaces(); + JupiterResult ListBuildBuckets(std::string_view Namespace); JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload); JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); @@ -135,13 +137,14 @@ public: uint64_t PayloadSize, std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems); - JupiterResult GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, - std::vector<std::function<JupiterResult()>>& OutWorkItems); + JupiterResult GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems); JupiterResult PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, @@ -152,9 +155,15 @@ public: const Oid& BuildId, const Oid& PartId, const IoHash& RawHash); - JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount); JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload); + JupiterResult PutBuildPartStats(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& BuildPartId, + IoBuffer Payload); + private: inline LoggerRef Log() { return m_Log; } @@ -164,6 +173,7 @@ private: LoggerRef m_Log; HttpClient& m_HttpClient; + const bool m_AllowRedirect = false; }; } // namespace zen diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index 758722156..cd28bdcb2 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -27,7 +27,6 @@ public: { ZEN_MEMSCOPE(ELLMTag::Logging); - ZEN_MEMSCOPE(ELLMTag::Logging); std::error_code Ec; if (RotateOnOpen) { diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h deleted file mode 100644 index 79798fc8d..000000000 --- a/src/zenutil/include/zenutil/parallellwork.h +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/thread.h> -#include <zencore/workthreadpool.h> - -#include <atomic> - -namespace zen { - -class ParallellWork -{ -public: - ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {} - - ~ParallellWork() - { - // Make sure to call Wait before destroying - ZEN_ASSERT(m_PendingWork.Remaining() == 0); - } - - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction() - { - return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); }); - AbortFlag = true; - }; - } - - void ScheduleWork(WorkerThreadPool& WorkerPool, - std::function<void(std::atomic<bool>& AbortFlag)>&& Work, - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError) - { - m_PendingWork.AddCount(1); - try - { - WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] { - try - { - Work(m_AbortFlag); - } - catch (const AssertException& AssertEx) - { - OnError( - std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())), - m_AbortFlag); - } - catch (const std::system_error& SystemError) - { - if (IsOOM(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag); - } - else if (IsOOD(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag); - } - else - { - OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag); - } - } - catch (const std::exception& Ex) - { - OnError(Ex, m_AbortFlag); - } - m_PendingWork.CountDown(); - }); - } - catch (const std::exception&) - { - m_PendingWork.CountDown(); - throw; - } - } - - void Abort() { m_AbortFlag = true; } - - bool IsAborted() const { return m_AbortFlag.load(); } - - void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback) - { - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - while (!m_PendingWork.Wait(UpdateInteralMS)) - { - UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); - } - if (m_Errors.size() == 1) - { - throw std::runtime_error(m_Errors.front()); - } - else if (m_Errors.size() > 1) - { - ExtendableStringBuilder<128> SB; - SB.Append("Multiple errors:"); - for (const std::string& Error : m_Errors) - { - SB.Append(fmt::format("\n {}", Error)); - } - throw std::runtime_error(SB.ToString()); - } - } - Latch& PendingWork() { return m_PendingWork; } - -private: - std::atomic<bool>& m_AbortFlag; - Latch m_PendingWork; - - RwLock m_ErrorLock; - std::vector<std::string> m_Errors; -}; - -} // namespace zen diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h new file mode 100644 index 000000000..639c6968c --- /dev/null +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -0,0 +1,77 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> + +#include <atomic> + +namespace zen { + +class ParallelWork +{ +public: + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag); + + ~ParallelWork(); + + typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; + typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; + typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback; + + void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) + { + m_PendingWork.AddCount(1); + try + { + WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); + try + { + while (m_PauseFlag && !m_AbortFlag) + { + Sleep(2000); + } + Work(m_AbortFlag); + } + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + }); + } + catch (const std::exception&) + { + m_PendingWork.CountDown(); + throw; + } + } + + void Abort() { m_AbortFlag = true; } + + bool IsAborted() const { return m_AbortFlag.load(); } + + void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback); + + void Wait(); + + Latch& PendingWork() { return m_PendingWork; } + +private: + ExceptionCallback DefaultErrorFunction(); + void RethrowErrors(); + + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + bool m_DispatchComplete = false; + Latch m_PendingWork; + + RwLock m_ErrorLock; + std::vector<std::exception_ptr> m_Errors; +}; + +void parallellwork_forcelink(); + +} // namespace zen diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h index 9683ad720..df2033bca 100644 --- a/src/zenutil/include/zenutil/workerpools.h +++ b/src/zenutil/include/zenutil/workerpools.h @@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType); // Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType); +// Worker pool with minimum number of worker threads, but at least one thread +WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType); + // Special worker pool that does not use worker thread but issues all scheduled work on the calling thread // This is useful for debugging when multiple async thread can make stepping in debugger complicated WorkerThreadPool& GetSyncWorkerPool(); |