aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/include
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-14 09:50:00 +0100
committerDan Engelbrecht <[email protected]>2025-03-14 09:50:00 +0100
commit55c67aec301cfc99178ab54c6366cbc88f35d46a (patch)
tree84b4c73220f7dd041763b6d1919eedc8d0b90844 /src/zenutil/include
parentMerge remote-tracking branch 'origin/de/zen-service-command' into de/zen-serv... (diff)
parentfix quoted command lines arguments (#306) (diff)
downloadzen-55c67aec301cfc99178ab54c6366cbc88f35d46a.tar.xz
zen-55c67aec301cfc99178ab54c6366cbc88f35d46a.zip
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenutil/include')
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h58
-rw-r--r--src/zenutil/include/zenutil/cache/cachekey.h6
-rw-r--r--src/zenutil/include/zenutil/chunkblock.h40
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h283
-rw-r--r--src/zenutil/include/zenutil/chunkedfile.h59
-rw-r--r--src/zenutil/include/zenutil/chunkingcontroller.h56
-rw-r--r--src/zenutil/include/zenutil/filebuildstorage.h16
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h17
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterclient.h11
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h67
-rw-r--r--src/zenutil/include/zenutil/logging.h1
-rw-r--r--src/zenutil/include/zenutil/logging/fullformatter.h7
-rw-r--r--src/zenutil/include/zenutil/parallellwork.h117
13 files changed, 706 insertions, 32 deletions
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
new file mode 100644
index 000000000..9d2bab170
--- /dev/null
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -0,0 +1,58 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zenutil/chunkblock.h>
+
+namespace zen {
+
+class BuildStorage
+{
+public:
+ struct Statistics
+ {
+ std::atomic<uint64_t> TotalBytesRead = 0;
+ std::atomic<uint64_t> TotalBytesWritten = 0;
+ std::atomic<uint64_t> TotalRequestCount = 0;
+ std::atomic<uint64_t> TotalRequestTimeUs = 0;
+ std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ };
+
+ virtual ~BuildStorage() {}
+
+ virtual CbObject ListBuilds(CbObject Query) = 0;
+ virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) = 0;
+ virtual CbObject GetBuild(const Oid& BuildId) = 0;
+ virtual void FinalizeBuild(const Oid& BuildId) = 0;
+
+ virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
+ const Oid& BuildPartId,
+ std::string_view PartName,
+ const CbObject& MetaData) = 0;
+ virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) = 0;
+ virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) = 0;
+ virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0;
+ virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::function<void(uint64_t, bool)>&& OnSentBytes) = 0;
+
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t RangeOffset = 0,
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(
+ const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0;
+
+ virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
+ virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0;
+ virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/cache/cachekey.h b/src/zenutil/include/zenutil/cache/cachekey.h
index 741375946..0ab05f4f1 100644
--- a/src/zenutil/include/zenutil/cache/cachekey.h
+++ b/src/zenutil/include/zenutil/cache/cachekey.h
@@ -17,6 +17,12 @@ struct CacheKey
static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; }
+ // This should be used whenever the bucket name has already been validated to avoid redundant ToLower calls
+ static CacheKey CreateValidated(std::string&& BucketValidated, const IoHash& Hash)
+ {
+ return {.Bucket = std::move(BucketValidated), .Hash = Hash};
+ }
+
auto operator<=>(const CacheKey& that) const
{
if (auto b = caseSensitiveCompareStrings(Bucket, that.Bucket); b != std::strong_ordering::equal)
diff --git a/src/zenutil/include/zenutil/chunkblock.h b/src/zenutil/include/zenutil/chunkblock.h
new file mode 100644
index 000000000..277580c74
--- /dev/null
+++ b/src/zenutil/include/zenutil/chunkblock.h
@@ -0,0 +1,40 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compress.h>
+
+#include <optional>
+#include <vector>
+
+namespace zen {
+
+struct ThinChunkBlockDescription
+{
+ IoHash BlockHash;
+ std::vector<IoHash> ChunkRawHashes;
+};
+
+struct ChunkBlockDescription : public ThinChunkBlockDescription
+{
+ uint64_t HeaderSize;
+ std::vector<uint32_t> ChunkRawLengths;
+ std::vector<uint32_t> ChunkCompressedLengths;
+};
+
+std::vector<ChunkBlockDescription> ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject);
+ChunkBlockDescription ParseChunkBlockDescription(const CbObjectView& BlockObject);
+CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData);
+ChunkBlockDescription GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash);
+typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc;
+
+CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock);
+bool IterateChunkBlock(const SharedBuffer& BlockPayload,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor,
+ uint64_t& OutHeaderSize);
+std::vector<uint32_t> ReadChunkBlockHeader(const MemoryView BlockView, uint64_t& OutHeaderSize);
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
new file mode 100644
index 000000000..57b55cb8e
--- /dev/null
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -0,0 +1,283 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/iohash.h>
+
+#include <filesystem>
+#include <vector>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+class CbWriter;
+class ChunkingController;
+class WorkerThreadPool;
+
+enum class SourcePlatform
+{
+ Windows = 0,
+ Linux = 1,
+ MacOS = 2,
+ _Count
+};
+
+std::string_view ToString(SourcePlatform Platform);
+SourcePlatform FromString(std::string_view Platform, SourcePlatform Default);
+SourcePlatform GetSourceCurrentPlatform();
+
+struct FolderContent
+{
+ SourcePlatform Platform = GetSourceCurrentPlatform();
+ std::vector<std::filesystem::path> Paths;
+ std::vector<uint64_t> RawSizes;
+ std::vector<uint32_t> Attributes;
+ std::vector<uint64_t> ModificationTicks;
+
+ bool operator==(const FolderContent& Rhs) const;
+
+ bool AreKnownFilesEqual(const FolderContent& Rhs) const;
+ void UpdateState(const FolderContent& Rhs, std::vector<uint32_t>& PathIndexesOufOfDate);
+ static bool AreFileAttributesEqual(const uint32_t Lhs, const uint32_t Rhs);
+};
+
+FolderContent GetUpdatedContent(const FolderContent& Old,
+ const FolderContent& New,
+ std::vector<std::filesystem::path>& OutDeletedPathIndexes);
+
+void SaveFolderContentToCompactBinary(const FolderContent& Content, CbWriter& Output);
+FolderContent LoadFolderContentToCompactBinary(CbObjectView Input);
+
+struct GetFolderContentStatistics
+{
+ std::atomic<uint64_t> FoundFileCount = 0;
+ std::atomic<uint64_t> FoundFileByteCount = 0;
+ std::atomic<uint64_t> AcceptedFileCount = 0;
+ std::atomic<uint64_t> AcceptedFileByteCount = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+};
+
+FolderContent GetFolderContent(GetFolderContentStatistics& Stats,
+ const std::filesystem::path& RootPath,
+ std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory,
+ std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile,
+ WorkerThreadPool& WorkerPool,
+ int32_t UpdateInteralMS,
+ std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag);
+
+struct ChunkedContentData
+{
+ // To describe one asset with a particular RawHash, find the index of the hash in SequenceRawHashes
+ // ChunkCounts for that index will be the number of indexes in ChunkOrders that describe
+ // the sequence of chunks required to reconstruct the asset.
+ // Offset into ChunkOrders is based on how many entries in ChunkOrders the previous [n - 1] SequenceRawHashes uses
+ std::vector<IoHash> SequenceRawHashes; // Raw hash for Chunk sequence
+ std::vector<uint32_t> ChunkCounts; // Chunk count of ChunkOrder for SequenceRawHashes[n]
+ std::vector<uint32_t> ChunkOrders; // Chunk sequence indexed into ChunkHashes, ChunkCounts[n] indexes per SequenceRawHashes[n]
+ std::vector<IoHash> ChunkHashes; // Unique chunk hashes
+ std::vector<uint64_t> ChunkRawSizes; // Unique chunk raw size for ChunkHash[n]
+};
+
+struct ChunkedFolderContent
+{
+ SourcePlatform Platform = GetSourceCurrentPlatform();
+ std::vector<std::filesystem::path> Paths;
+ std::vector<uint64_t> RawSizes;
+ std::vector<uint32_t> Attributes;
+ std::vector<IoHash> RawHashes;
+ ChunkedContentData ChunkedContent;
+};
+
+void SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbWriter& Output);
+ChunkedFolderContent LoadChunkedFolderContentToCompactBinary(CbObjectView Input);
+
+ChunkedFolderContent MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const ChunkedFolderContent> Overlays);
+ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base, std::span<const std::filesystem::path> DeletedPaths);
+
+struct ChunkingStatistics
+{
+ std::atomic<uint64_t> FilesProcessed = 0;
+ std::atomic<uint64_t> FilesChunked = 0;
+ std::atomic<uint64_t> BytesHashed = 0;
+ std::atomic<uint64_t> UniqueChunksFound = 0;
+ std::atomic<uint64_t> UniqueSequencesFound = 0;
+ std::atomic<uint64_t> UniqueBytesFound = 0;
+ uint64_t ElapsedWallTimeUS = 0;
+};
+
+ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& RootPath,
+ const FolderContent& Content,
+ const ChunkingController& InChunkingController,
+ int32_t UpdateInteralMS,
+ std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag);
+
+struct ChunkedContentLookup
+{
+ struct ChunkSequenceLocation
+ {
+ uint32_t SequenceIndex = (uint32_t)-1;
+ uint64_t Offset = (uint64_t)-1;
+ };
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex;
+ std::vector<uint32_t> SequenceIndexChunkOrderOffset;
+ std::vector<ChunkSequenceLocation> ChunkSequenceLocations;
+ std::vector<size_t>
+ ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex
+ std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex
+ std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash
+};
+
+ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content);
+
+inline std::pair<size_t, uint32_t>
+GetChunkSequenceLocationRange(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
+{
+ return std::make_pair(Lookup.ChunkSequenceLocationOffset[ChunkIndex], Lookup.ChunkSequenceLocationCounts[ChunkIndex]);
+}
+
+inline std::span<const ChunkedContentLookup::ChunkSequenceLocation>
+GetChunkSequenceLocations(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
+{
+ std::pair<size_t, uint32_t> Range = GetChunkSequenceLocationRange(Lookup, ChunkIndex);
+ return std::span<const ChunkedContentLookup::ChunkSequenceLocation>(Lookup.ChunkSequenceLocations).subspan(Range.first, Range.second);
+}
+
+inline uint32_t
+GetSequenceIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash)
+{
+ return Lookup.RawHashToSequenceIndex.at(RawHash);
+}
+
+inline uint32_t
+GetChunkIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash)
+{
+ return Lookup.RawHashToSequenceIndex.at(RawHash);
+}
+
+inline uint32_t
+GetFirstPathIndexForSeqeuenceIndex(const ChunkedContentLookup& Lookup, const uint32_t SequenceIndex)
+{
+ return Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+}
+
+inline uint32_t
+GetFirstPathIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash)
+{
+ const uint32_t SequenceIndex = GetSequenceIndexForRawHash(Lookup, RawHash);
+ return GetFirstPathIndexForSeqeuenceIndex(Lookup, SequenceIndex);
+}
+
+namespace compactbinary_helpers {
+ template<typename Type>
+ void WriteArray(std::span<const Type> Values, std::string_view ArrayName, CbWriter& Output)
+ {
+ Output.BeginArray(ArrayName);
+ for (const Type Value : Values)
+ {
+ Output << Value;
+ }
+ Output.EndArray();
+ }
+
+ template<typename Type>
+ void WriteArray(const std::vector<Type>& Values, std::string_view ArrayName, CbWriter& Output)
+ {
+ WriteArray(std::span<const Type>(Values), ArrayName, Output);
+ }
+
+ template<>
+ inline void WriteArray(std::span<const std::filesystem::path> Values, std::string_view ArrayName, CbWriter& Output)
+ {
+ Output.BeginArray(ArrayName);
+ for (const std::filesystem::path& Path : Values)
+ {
+ Output.AddString((const char*)Path.generic_u8string().c_str());
+ }
+ Output.EndArray();
+ }
+
+ template<>
+ inline void WriteArray(const std::vector<std::filesystem::path>& Values, std::string_view ArrayName, CbWriter& Output)
+ {
+ WriteArray(std::span<const std::filesystem::path>(Values), ArrayName, Output);
+ }
+
+ inline void WriteBinaryAttachmentArray(std::span<const IoHash> Values, std::string_view ArrayName, CbWriter& Output)
+ {
+ Output.BeginArray(ArrayName);
+ for (const IoHash& Hash : Values)
+ {
+ Output.AddBinaryAttachment(Hash);
+ }
+ Output.EndArray();
+ }
+
+ inline void WriteBinaryAttachmentArray(const std::vector<IoHash>& Values, std::string_view ArrayName, CbWriter& Output)
+ {
+ WriteArray(std::span<const IoHash>(Values), ArrayName, Output);
+ }
+
+ inline void ReadArray(std::string_view ArrayName, CbObjectView Input, std::vector<uint32_t>& Result)
+ {
+ CbArrayView Array = Input[ArrayName].AsArrayView();
+ Result.reserve(Array.Num());
+ for (CbFieldView ItemView : Array)
+ {
+ Result.push_back(ItemView.AsUInt32());
+ }
+ }
+
+ inline void ReadArray(std::string_view ArrayName, CbObjectView Input, std::vector<uint64_t>& Result)
+ {
+ CbArrayView Array = Input[ArrayName].AsArrayView();
+ Result.reserve(Array.Num());
+ for (CbFieldView ItemView : Array)
+ {
+ Result.push_back(ItemView.AsUInt64());
+ }
+ }
+
+ inline void ReadArray(std::string_view ArrayName, CbObjectView Input, std::vector<std::filesystem::path>& Result)
+ {
+ CbArrayView Array = Input[ArrayName].AsArrayView();
+ Result.reserve(Array.Num());
+ for (CbFieldView ItemView : Array)
+ {
+ std::u8string_view U8Path = ItemView.AsU8String();
+ Result.push_back(std::filesystem::path(U8Path));
+ }
+ }
+
+ inline void ReadArray(std::string_view ArrayName, CbObjectView Input, std::vector<IoHash>& Result)
+ {
+ CbArrayView Array = Input[ArrayName].AsArrayView();
+ Result.reserve(Array.Num());
+ for (CbFieldView ItemView : Array)
+ {
+ Result.push_back(ItemView.AsHash());
+ }
+ }
+
+ inline void ReadBinaryAttachmentArray(std::string_view ArrayName, CbObjectView Input, std::vector<IoHash>& Result)
+ {
+ CbArrayView Array = Input[ArrayName].AsArrayView();
+ Result.reserve(Array.Num());
+ for (CbFieldView ItemView : Array)
+ {
+ Result.push_back(ItemView.AsBinaryAttachment());
+ }
+ }
+
+} // namespace compactbinary_helpers
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/chunkedfile.h b/src/zenutil/include/zenutil/chunkedfile.h
new file mode 100644
index 000000000..4cec80fdb
--- /dev/null
+++ b/src/zenutil/include/zenutil/chunkedfile.h
@@ -0,0 +1,59 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/zencore.h>
+
+#include <functional>
+#include <vector>
+
+namespace zen {
+
+class BasicFile;
+
+struct ChunkedInfo
+{
+ uint64_t RawSize = 0;
+ IoHash RawHash;
+ std::vector<uint32_t> ChunkSequence;
+ std::vector<IoHash> ChunkHashes;
+};
+
+struct ChunkSource
+{
+ uint64_t Offset; // 8
+ uint32_t Size; // 4
+};
+
+struct ChunkedInfoWithSource
+{
+ ChunkedInfo Info;
+ std::vector<ChunkSource> ChunkSources;
+};
+
+struct ChunkedParams
+{
+ bool UseThreshold = true;
+ size_t MinSize = (2u * 1024u) - 128u;
+ size_t MaxSize = (16u * 1024u);
+ size_t AvgSize = (3u * 1024u);
+};
+
+static const ChunkedParams UShaderByteCodeParams = {.UseThreshold = true, .MinSize = 17280, .MaxSize = 139264, .AvgSize = 36340};
+
+ChunkedInfoWithSource ChunkData(BasicFile& RawData,
+ uint64_t Offset,
+ uint64_t Size,
+ ChunkedParams Params = {},
+ std::atomic<uint64_t>* BytesProcessed = nullptr,
+ std::atomic<bool>* AbortFlag = nullptr);
+void Reconstruct(const ChunkedInfo& Info,
+ const std::filesystem::path& TargetPath,
+ std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk);
+IoBuffer SerializeChunkedInfo(const ChunkedInfo& Info);
+ChunkedInfo DeserializeChunkedInfo(IoBuffer& Buffer);
+
+void chunkedfile_forcelink();
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h
new file mode 100644
index 000000000..246f4498a
--- /dev/null
+++ b/src/zenutil/include/zenutil/chunkingcontroller.h
@@ -0,0 +1,56 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+
+#include <zenutil/chunkedfile.h>
+
+#include <atomic>
+#include <filesystem>
+
+namespace zen {
+
+const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"};
+
+const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u,
+ .MaxSize = 128u * 1024u,
+ .AvgSize = ((8u * 4u) * 1024u) + 128u};
+
+const size_t DefaultChunkingFileSizeLimit = DefaultChunkedParams.MaxSize;
+
+const uint32_t DefaultFixedChunkingChunkSize = 16u * 1024u * 1024u;
+
+struct ChunkedInfoWithSource;
+
+class ChunkingController
+{
+public:
+ virtual ~ChunkingController() {}
+
+ // Return true if the input file was processed. If true is returned OutChunked will contain the chunked info
+ virtual bool ProcessFile(const std::filesystem::path& InputPath,
+ uint64_t RawSize,
+ ChunkedInfoWithSource& OutChunked,
+ std::atomic<uint64_t>& BytesProcessed,
+ std::atomic<bool>& AbortFlag) const = 0;
+ virtual std::string_view GetName() const = 0;
+ virtual CbObject GetParameters() const = 0;
+};
+
+std::unique_ptr<ChunkingController> CreateBasicChunkingController(
+ std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
+ uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
+ const ChunkedParams& ChunkingParams = DefaultChunkedParams);
+std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters);
+
+std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(
+ std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
+ uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
+ const ChunkedParams& ChunkingParams = DefaultChunkedParams,
+ uint32_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize);
+std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters);
+
+std::unique_ptr<ChunkingController> CreateChunkingController(std::string_view Name, CbObjectView Parameters);
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/filebuildstorage.h b/src/zenutil/include/zenutil/filebuildstorage.h
new file mode 100644
index 000000000..c95fb32e6
--- /dev/null
+++ b/src/zenutil/include/zenutil/filebuildstorage.h
@@ -0,0 +1,16 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zenutil/buildstorage.h>
+
+namespace zen {
+class HttpClient;
+
+std::unique_ptr<BuildStorage> CreateFileBuildStorage(const std::filesystem::path& StoragePath,
+ BuildStorage::Statistics& Stats,
+ bool EnableJsonOutput,
+ double LatencySec = 0.0,
+ double DelayPerKBSec = 0.0);
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
new file mode 100644
index 000000000..89fc70140
--- /dev/null
+++ b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
@@ -0,0 +1,17 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zenutil/buildstorage.h>
+
+namespace zen {
+class HttpClient;
+
+std::unique_ptr<BuildStorage> CreateJupiterBuildStorage(LoggerRef InLog,
+ HttpClient& InHttpClient,
+ BuildStorage::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath);
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterclient.h b/src/zenutil/include/zenutil/jupiter/jupiterclient.h
index defe50edc..8a51bd60a 100644
--- a/src/zenutil/include/zenutil/jupiter/jupiterclient.h
+++ b/src/zenutil/include/zenutil/jupiter/jupiterclient.h
@@ -44,12 +44,11 @@ public:
HttpClient& Client() { return m_HttpClient; }
private:
- LoggerRef m_Log;
- const std::string m_DefaultDdcNamespace;
- const std::string m_DefaultBlobStoreNamespace;
- const std::string m_ComputeCluster;
- std::function<HttpClientAccessToken()> m_TokenProvider;
- HttpClient m_HttpClient;
+ LoggerRef m_Log;
+ const std::string m_DefaultDdcNamespace;
+ const std::string m_DefaultBlobStoreNamespace;
+ const std::string m_ComputeCluster;
+ HttpClient m_HttpClient;
friend class JupiterSession;
};
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 6a80332f4..2c5fc73b8 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -102,33 +102,49 @@ public:
std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
- JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
- JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- PutBuildPartResult PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload);
- JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
- JupiterResult PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload);
- JupiterResult GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath);
+ JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload);
+ JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
+ JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ PutBuildPartResult PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload);
+ JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload);
+ JupiterResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath,
+ uint64_t Offset = 0,
+ uint64_t Size = (uint64_t)-1);
+
+ JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems);
+ JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems);
JupiterResult PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
const IoBuffer& Payload);
FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
@@ -136,7 +152,8 @@ public:
const Oid& BuildId,
const Oid& PartId,
const IoHash& RawHash);
- JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload);
private:
inline LoggerRef Log() { return m_Log; }
diff --git a/src/zenutil/include/zenutil/logging.h b/src/zenutil/include/zenutil/logging.h
index ebf6372fc..d64eef207 100644
--- a/src/zenutil/include/zenutil/logging.h
+++ b/src/zenutil/include/zenutil/logging.h
@@ -32,6 +32,7 @@ struct LoggingOptions
bool IsDebug = false;
bool IsVerbose = false;
bool IsTest = false;
+ bool AllowAsync = true;
bool NoConsoleOutput = false;
std::filesystem::path AbsLogFile; // Absolute path to main log file
std::string LogId;
diff --git a/src/zenutil/include/zenutil/logging/fullformatter.h b/src/zenutil/include/zenutil/logging/fullformatter.h
index 07ad408fa..0326870e5 100644
--- a/src/zenutil/include/zenutil/logging/fullformatter.h
+++ b/src/zenutil/include/zenutil/logging/fullformatter.h
@@ -45,6 +45,8 @@ public:
std::chrono::seconds TimestampSeconds;
+ std::chrono::milliseconds millis;
+
if (m_UseFullDate)
{
TimestampSeconds = std::chrono::duration_cast<std::chrono::seconds>(msg.time.time_since_epoch());
@@ -69,6 +71,8 @@ public:
spdlog::details::fmt_helper::pad2(m_CachedLocalTm.tm_sec, m_CachedDatetime);
m_CachedDatetime.push_back('.');
}
+
+ millis = spdlog::details::fmt_helper::time_fraction<std::chrono::milliseconds>(msg.time);
}
else
{
@@ -97,6 +101,8 @@ public:
spdlog::details::fmt_helper::pad2(LogSecs, m_CachedDatetime);
m_CachedDatetime.push_back('.');
}
+
+ millis = std::chrono::duration_cast<std::chrono::milliseconds>(ElapsedTime - TimestampSeconds);
}
{
@@ -104,7 +110,6 @@ public:
OutBuffer.append(m_CachedDatetime.begin(), m_CachedDatetime.end());
}
- auto millis = spdlog::details::fmt_helper::time_fraction<std::chrono::milliseconds>(msg.time);
spdlog::details::fmt_helper::pad3(static_cast<uint32_t>(millis.count()), OutBuffer);
OutBuffer.push_back(']');
OutBuffer.push_back(' ');
diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h
new file mode 100644
index 000000000..79798fc8d
--- /dev/null
+++ b/src/zenutil/include/zenutil/parallellwork.h
@@ -0,0 +1,117 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/thread.h>
+#include <zencore/workthreadpool.h>
+
+#include <atomic>
+
+namespace zen {
+
+class ParallellWork
+{
+public:
+ ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {}
+
+ ~ParallellWork()
+ {
+ // Make sure to call Wait before destroying
+ ZEN_ASSERT(m_PendingWork.Remaining() == 0);
+ }
+
+ std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction()
+ {
+ return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) {
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); });
+ AbortFlag = true;
+ };
+ }
+
+ void ScheduleWork(WorkerThreadPool& WorkerPool,
+ std::function<void(std::atomic<bool>& AbortFlag)>&& Work,
+ std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError)
+ {
+ m_PendingWork.AddCount(1);
+ try
+ {
+ WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] {
+ try
+ {
+ Work(m_AbortFlag);
+ }
+ catch (const AssertException& AssertEx)
+ {
+ OnError(
+ std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())),
+ m_AbortFlag);
+ }
+ catch (const std::system_error& SystemError)
+ {
+ if (IsOOM(SystemError.code()))
+ {
+ OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ else if (IsOOD(SystemError.code()))
+ {
+ OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ else
+ {
+ OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ OnError(Ex, m_AbortFlag);
+ }
+ m_PendingWork.CountDown();
+ });
+ }
+ catch (const std::exception&)
+ {
+ m_PendingWork.CountDown();
+ throw;
+ }
+ }
+
+ void Abort() { m_AbortFlag = true; }
+
+ bool IsAborted() const { return m_AbortFlag.load(); }
+
+ void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback)
+ {
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+ while (!m_PendingWork.Wait(UpdateInteralMS))
+ {
+ UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
+ }
+ if (m_Errors.size() == 1)
+ {
+ throw std::runtime_error(m_Errors.front());
+ }
+ else if (m_Errors.size() > 1)
+ {
+ ExtendableStringBuilder<128> SB;
+ SB.Append("Multiple errors:");
+ for (const std::string& Error : m_Errors)
+ {
+ SB.Append(fmt::format("\n {}", Error));
+ }
+ throw std::runtime_error(SB.ToString());
+ }
+ }
+ Latch& PendingWork() { return m_PendingWork; }
+
+private:
+ std::atomic<bool>& m_AbortFlag;
+ Latch m_PendingWork;
+
+ RwLock m_ErrorLock;
+ std::vector<std::string> m_Errors;
+};
+
+} // namespace zen