aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/include
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-01-16 09:19:08 +0100
committerGitHub Enterprise <[email protected]>2025-01-16 09:19:08 +0100
commita5158f9fc806d506590dd9bf0e3282cb76c3ac4e (patch)
tree95a6dd46ad0520de4018e08ef6b3f409e25af3c3 /src/zenutil/include
parent5.5.17 (diff)
downloadzen-a5158f9fc806d506590dd9bf0e3282cb76c3ac4e.tar.xz
zen-a5158f9fc806d506590dd9bf0e3282cb76c3ac4e.zip
move basicfile.h/cpp -> zencore (#273)
move jupiter.h/cpp -> zenutil move packageformat.h/.cpp -> zenhttp zenutil now depends on zenhttp instead of the inverse
Diffstat (limited to 'src/zenutil/include')
-rw-r--r--src/zenutil/include/zenutil/basicfile.h185
-rw-r--r--src/zenutil/include/zenutil/jupiter.h256
-rw-r--r--src/zenutil/include/zenutil/logging/rotatingfilesink.h2
-rw-r--r--src/zenutil/include/zenutil/packageformat.h164
4 files changed, 257 insertions, 350 deletions
diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h
deleted file mode 100644
index 03c5605df..000000000
--- a/src/zenutil/include/zenutil/basicfile.h
+++ /dev/null
@@ -1,185 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/zencore.h>
-
-#include <zencore/compositebuffer.h>
-#include <zencore/enumflags.h>
-#include <zencore/iobuffer.h>
-
-#include <filesystem>
-#include <functional>
-
-namespace zen {
-
-class CbObject;
-
-/**
- * Probably the most basic file abstraction in the universe
- *
- * One thing of note is that there is no notion of a "current file position"
- * in this API -- all reads and writes are done from explicit offsets in
- * the file. This avoids concurrency issues which can occur otherwise.
- *
- */
-
-class BasicFile
-{
-public:
- BasicFile() = default;
- ~BasicFile();
-
- BasicFile(const BasicFile&) = delete;
- BasicFile& operator=(const BasicFile&) = delete;
-
- enum class Mode : uint32_t
- {
- kRead = 0, // Opens a existing file for read only
- kWrite = 1, // Opens (or creates) a file for read and write
- kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero
- kDelete = 3, // Opens (or creates) a file for read and write allowing .DeleteFile file disposition to be set
- kTruncateDelete =
- 4, // Opens (or creates) a file for read and write and sets the size to zero allowing .DeleteFile file disposition to be set
- kModeMask = 0x0007,
- kPreventDelete = 0x1000'0000, // Do not open with delete sharing mode (prevent other processes from deleting file while open)
- kPreventWrite = 0x2000'0000, // Do not open with write sharing mode (prevent other processes from writing to file while open)
- };
-
- void Open(const std::filesystem::path& FileName, Mode Mode);
- void Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec);
- void Open(const std::filesystem::path& FileName, Mode Mode, std::function<bool(std::error_code& Ec)>&& RetryCallback);
- void Close();
- void Read(void* Data, uint64_t Size, uint64_t FileOffset);
- IoBuffer ReadRange(uint64_t FileOffset, uint64_t ByteCount);
- void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
- void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
- void Write(MemoryView Data, uint64_t FileOffset);
- void Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec);
- uint64_t Write(CompositeBuffer Data, uint64_t FileOffset, std::error_code& Ec);
- void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
- void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec);
- void Flush();
- [[nodiscard]] uint64_t FileSize() const;
- [[nodiscard]] uint64_t FileSize(std::error_code& Ec) const;
- void SetFileSize(uint64_t FileSize);
- IoBuffer ReadAll();
- void WriteAll(IoBuffer Data, std::error_code& Ec);
- void Attach(void* Handle);
- void* Detach();
-
- inline void* Handle() { return m_FileHandle; }
- bool IsOpen() const { return m_FileHandle != nullptr; }
-
-protected:
- void* m_FileHandle = nullptr; // This is either null or valid
-private:
-};
-
-ENUM_CLASS_FLAGS(BasicFile::Mode);
-
-/**
- * Simple abstraction for a temporary file
- *
- * Works like a regular BasicFile but implements a simple mechanism to allow creating
- * a temporary file for writing in a directory which may later be moved atomically
- * into the intended location after it has been fully written to.
- *
- */
-
-class TemporaryFile : public BasicFile
-{
-public:
- TemporaryFile() = default;
- ~TemporaryFile();
-
- TemporaryFile(const TemporaryFile&) = delete;
- TemporaryFile& operator=(const TemporaryFile&) = delete;
-
- void CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec);
- void MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec);
- const std::filesystem::path& GetPath() const { return m_TempPath; }
-
- static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data);
- static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, std::error_code& OutEc);
-
-private:
- void Close();
- std::filesystem::path m_TempPath;
-
- using BasicFile::Open;
-};
-
-/** Lock file abstraction
-
- */
-
-class LockFile : protected BasicFile
-{
-public:
- LockFile();
- ~LockFile();
-
- void Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec);
- void Update(CbObject Payload, std::error_code& Ec);
-
-private:
-};
-
-/** Adds a layer of buffered reading to a BasicFile
-
-This class is not intended for concurrent access, it is not thread safe.
-
-*/
-
-class BasicFileBuffer
-{
-public:
- BasicFileBuffer(BasicFile& Base, uint64_t BufferSize);
- ~BasicFileBuffer();
-
- void Read(void* Data, uint64_t Size, uint64_t FileOffset);
- MemoryView MakeView(uint64_t Size, uint64_t FileOffset);
-
- template<typename T>
- const T* MakeView(uint64_t FileOffset)
- {
- MemoryView View = MakeView(sizeof(T), FileOffset);
- return reinterpret_cast<const T*>(View.GetData());
- }
-
-private:
- BasicFile& m_Base;
- uint8_t* m_Buffer;
- const uint64_t m_BufferSize;
- uint64_t m_Size;
- uint64_t m_BufferStart;
- uint64_t m_BufferEnd;
-};
-
-/** Adds a layer of buffered writing to a BasicFile
-
-This class is not intended for concurrent access, it is not thread safe.
-
-*/
-
-class BasicFileWriter
-{
-public:
- BasicFileWriter(BasicFile& Base, uint64_t BufferSize);
- ~BasicFileWriter();
-
- void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
- void Flush();
-
-private:
- BasicFile& m_Base;
- uint8_t* m_Buffer;
- const uint64_t m_BufferSize;
- uint64_t m_BufferStart;
- uint64_t m_BufferEnd;
-};
-
-ZENCORE_API void basicfile_forcelink();
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter.h b/src/zenutil/include/zenutil/jupiter.h
new file mode 100644
index 000000000..50e4ad68a
--- /dev/null
+++ b/src/zenutil/include/zenutil/jupiter.h
@@ -0,0 +1,256 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenbase/refcount.h>
+#include <zencore/iohash.h>
+#include <zencore/logging.h>
+#include <zencore/thread.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/httpserver.h>
+
+#include <atomic>
+#include <chrono>
+#include <list>
+#include <memory>
+#include <set>
+#include <vector>
+
+struct ZenCacheValue;
+
+namespace cpr {
+class Session;
+}
+
+namespace zen {
+
+class CbObjectView;
+class CloudCacheClient;
+class IoBuffer;
+struct IoHash;
+
+/**
+ * Cached access token, for use with `Authorization:` header
+ */
+struct CloudCacheAccessToken
+{
+ using Clock = std::chrono::system_clock;
+ using TimePoint = Clock::time_point;
+
+ static constexpr int64_t ExpireMarginInSeconds = 30;
+
+ std::string Value;
+ TimePoint ExpireTime;
+
+ bool IsValid() const
+ {
+ return Value.empty() == false &&
+ ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count();
+ }
+};
+
+struct CloudCacheResult
+{
+ IoBuffer Response;
+ uint64_t SentBytes{};
+ uint64_t ReceivedBytes{};
+ double ElapsedSeconds{};
+ int32_t ErrorCode{};
+ std::string Reason;
+ bool Success = false;
+};
+
+struct PutRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct CloudCacheExistsResult : CloudCacheResult
+{
+ std::set<IoHash> Needs;
+};
+
+struct GetObjectReferencesResult : CloudCacheResult
+{
+ std::set<IoHash> References;
+};
+
+struct PutBuildPartResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeBuildPartResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
+/**
+ * Context for performing Jupiter operations
+ *
+ * Maintains an HTTP connection so that subsequent operations don't need to go
+ * through the whole connection setup process
+ *
+ */
+class CloudCacheSession
+{
+public:
+ CloudCacheSession(CloudCacheClient* CacheClient);
+ ~CloudCacheSession();
+
+ CloudCacheResult Authenticate();
+
+ CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
+ CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
+ CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath = {});
+
+ PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
+ CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
+
+ FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+
+ CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
+
+ GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
+
+ CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key);
+
+ CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+
+ std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+
+ CloudCacheResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
+ CloudCacheResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ CloudCacheResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ PutBuildPartResult PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload);
+ CloudCacheResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ CloudCacheResult PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload);
+ CloudCacheResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath);
+ CloudCacheResult PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ const IoBuffer& Payload);
+ FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash);
+ CloudCacheResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+
+ CloudCacheClient& Client() { return *m_CacheClient; };
+
+private:
+ inline LoggerRef Log() { return m_Log; }
+
+ CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
+
+ CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
+
+ LoggerRef m_Log;
+ RefPtr<CloudCacheClient> m_CacheClient;
+};
+
+/**
+ * Access token provider interface
+ */
+class CloudCacheTokenProvider
+{
+public:
+ virtual ~CloudCacheTokenProvider() = default;
+
+ virtual CloudCacheAccessToken AcquireAccessToken() = 0;
+
+ static std::unique_ptr<CloudCacheTokenProvider> CreateFromStaticToken(CloudCacheAccessToken Token);
+
+ struct OAuthClientCredentialsParams
+ {
+ std::string_view Url;
+ std::string_view ClientId;
+ std::string_view ClientSecret;
+ };
+
+ static std::unique_ptr<CloudCacheTokenProvider> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params);
+
+ static std::unique_ptr<CloudCacheTokenProvider> CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback);
+};
+
+struct CloudCacheClientOptions
+{
+ std::string_view Name;
+ std::string_view ServiceUrl;
+ std::string_view DdcNamespace;
+ std::string_view BlobStoreNamespace;
+ std::string_view ComputeCluster;
+ std::chrono::milliseconds ConnectTimeout{5000};
+ std::chrono::milliseconds Timeout{};
+ bool AssumeHttp2 = false;
+ bool AllowResume = false;
+ uint8_t RetryCount = 0;
+};
+
+/**
+ * Jupiter upstream cache client
+ */
+class CloudCacheClient : public RefCounted
+{
+public:
+ CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider);
+ ~CloudCacheClient();
+
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
+ std::string_view ComputeCluster() const { return m_ComputeCluster; }
+ std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
+
+ LoggerRef Logger() { return m_Log; }
+
+private:
+ LoggerRef m_Log;
+ const std::string m_DefaultDdcNamespace;
+ const std::string m_DefaultBlobStoreNamespace;
+ const std::string m_ComputeCluster;
+ const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
+ HttpClient m_HttpClient;
+
+ friend class CloudCacheSession;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
index 3eb9021dd..758722156 100644
--- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h
+++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
@@ -2,8 +2,8 @@
#pragma once
+#include <zencore/basicfile.h>
#include <zencore/memory/llm.h>
-#include <zenutil/basicfile.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <spdlog/details/log_msg.h>
diff --git a/src/zenutil/include/zenutil/packageformat.h b/src/zenutil/include/zenutil/packageformat.h
deleted file mode 100644
index c90b840da..000000000
--- a/src/zenutil/include/zenutil/packageformat.h
+++ /dev/null
@@ -1,164 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/compactbinarypackage.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-
-#include <functional>
-#include <gsl/gsl-lite.hpp>
-
-namespace zen {
-
-class IoBuffer;
-class CbPackage;
-class CompositeBuffer;
-
-/** _____ _ _____ _
- / ____| | | __ \ | |
- | | | |__ | |__) |_ _ ___| | ____ _ __ _ ___
- | | | '_ \| ___/ _` |/ __| |/ / _` |/ _` |/ _ \
- | |____| |_) | | | (_| | (__| < (_| | (_| | __/
- \_____|_.__/|_| \__,_|\___|_|\_\__,_|\__, |\___|
- __/ |
- |___/
-
- Structures and code related to handling CbPackage transactions
-
- CbPackage instances are marshaled across the wire using a distinct message
- format. We don't use the CbPackage serialization format provided by the
- CbPackage implementation itself since that does not provide much flexibility
- in how the attachment payloads are transmitted. The scheme below separates
- metadata cleanly from payloads and this enables us to more efficiently
- transmit them either via sendfile/TransmitFile like mechanisms, or by
- reference/memory mapping in the local case.
- */
-
-struct CbPackageHeader
-{
- uint32_t HeaderMagic;
- uint32_t AttachmentCount; // TODO: should add ability to opt out of implicit root document?
- uint32_t Reserved1;
- uint32_t Reserved2;
-};
-
-static_assert(sizeof(CbPackageHeader) == 16);
-
-enum : uint32_t
-{
- kCbPkgMagic = 0xaa77aacc
-};
-
-struct CbAttachmentEntry
-{
- uint64_t PayloadSize; // Size of the associated payload data in the message
- uint32_t Flags; // See flags below
- IoHash AttachmentHash; // Content Id for the attachment
-
- enum
- {
- kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format
- kIsObject = (1u << 1), // Is compact binary object
- kIsError = (1u << 2), // Is error (compact binary formatted) object
- kIsLocalRef = (1u << 3), // Is "local reference"
- };
-};
-
-struct CbAttachmentReferenceHeader
-{
- uint64_t PayloadByteOffset = 0;
- uint64_t PayloadByteSize = ~0u;
- uint16_t AbsolutePathLength = 0;
-
- // This header will be followed by UTF8 encoded absolute path to backing file
-};
-
-static_assert(sizeof(CbAttachmentEntry) == 32);
-
-enum class FormatFlags
-{
- kDefault = 0,
- kAllowLocalReferences = (1u << 0),
- kDenyPartialLocalReferences = (1u << 1)
-};
-
-gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags);
-
-enum class RpcAcceptOptions : uint16_t
-{
- kNone = 0,
- kAllowLocalReferences = (1u << 0),
- kAllowPartialLocalReferences = (1u << 1),
- kAllowPartialCacheChunks = (1u << 2)
-};
-
-gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions);
-
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle = nullptr);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle = nullptr);
-CbPackage ParsePackageMessage(
- IoBuffer Payload,
- std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer {
- return IoBuffer{Size};
- });
-bool IsPackageMessage(IoBuffer Payload);
-
-bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage);
-
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle = nullptr);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle = nullptr);
-
-/** Streaming reader for compact binary packages
-
- The goal is to ultimately support zero-copy I/O, but for now there'll be some
- copying involved on some platforms at least.
-
- This approach to deserializing CbPackage data is more efficient than
- `ParsePackageMessage` since it does not require the entire message to
- be resident in a memory buffer
-
- */
-class CbPackageReader
-{
-public:
- CbPackageReader();
- ~CbPackageReader();
-
- void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer);
-
- /** Process compact binary package data stream
-
- The data stream must be in the serialization format produced by FormatPackageMessage
-
- \return How many bytes must be fed to this function in the next call
- */
- uint64_t ProcessPackageHeaderData(const void* Data, uint64_t DataBytes);
-
- void Finalize();
- const std::vector<CbAttachment>& GetAttachments() { return m_Attachments; }
- CbObject GetRootObject() { return m_RootObject; }
- std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; }
-
-private:
- enum class State
- {
- kInitialState,
- kReadingHeader,
- kReadingAttachmentEntries,
- kReadingBuffers
- } m_CurrentState = State::kInitialState;
-
- std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer;
- std::vector<IoBuffer> m_PayloadBuffers;
- std::vector<CbAttachmentEntry> m_AttachmentEntries;
- std::vector<CbAttachment> m_Attachments;
- CbObject m_RootObject;
- CbPackageHeader m_PackageHeader;
-
- IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer);
-};
-
-void forcelink_packageformat();
-
-} // namespace zen