diff options
| author | Dan Engelbrecht <[email protected]> | 2025-01-16 09:19:08 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-01-16 09:19:08 +0100 |
| commit | a5158f9fc806d506590dd9bf0e3282cb76c3ac4e (patch) | |
| tree | 95a6dd46ad0520de4018e08ef6b3f409e25af3c3 /src/zenutil/include | |
| parent | 5.5.17 (diff) | |
| download | zen-a5158f9fc806d506590dd9bf0e3282cb76c3ac4e.tar.xz zen-a5158f9fc806d506590dd9bf0e3282cb76c3ac4e.zip | |
move basicfile.h/cpp -> zencore (#273)
move jupiter.h/cpp -> zenutil
move packageformat.h/.cpp -> zenhttp
zenutil now depends on zenhttp instead of the inverse
Diffstat (limited to 'src/zenutil/include')
| -rw-r--r-- | src/zenutil/include/zenutil/basicfile.h | 185 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter.h | 256 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/logging/rotatingfilesink.h | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/packageformat.h | 164 |
4 files changed, 257 insertions, 350 deletions
diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h deleted file mode 100644 index 03c5605df..000000000 --- a/src/zenutil/include/zenutil/basicfile.h +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/zencore.h> - -#include <zencore/compositebuffer.h> -#include <zencore/enumflags.h> -#include <zencore/iobuffer.h> - -#include <filesystem> -#include <functional> - -namespace zen { - -class CbObject; - -/** - * Probably the most basic file abstraction in the universe - * - * One thing of note is that there is no notion of a "current file position" - * in this API -- all reads and writes are done from explicit offsets in - * the file. This avoids concurrency issues which can occur otherwise. - * - */ - -class BasicFile -{ -public: - BasicFile() = default; - ~BasicFile(); - - BasicFile(const BasicFile&) = delete; - BasicFile& operator=(const BasicFile&) = delete; - - enum class Mode : uint32_t - { - kRead = 0, // Opens a existing file for read only - kWrite = 1, // Opens (or creates) a file for read and write - kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero - kDelete = 3, // Opens (or creates) a file for read and write allowing .DeleteFile file disposition to be set - kTruncateDelete = - 4, // Opens (or creates) a file for read and write and sets the size to zero allowing .DeleteFile file disposition to be set - kModeMask = 0x0007, - kPreventDelete = 0x1000'0000, // Do not open with delete sharing mode (prevent other processes from deleting file while open) - kPreventWrite = 0x2000'0000, // Do not open with write sharing mode (prevent other processes from writing to file while open) - }; - - void Open(const std::filesystem::path& FileName, Mode Mode); - void Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec); - void Open(const std::filesystem::path& FileName, Mode Mode, std::function<bool(std::error_code& Ec)>&& RetryCallback); - void Close(); - void Read(void* Data, uint64_t Size, uint64_t FileOffset); - IoBuffer ReadRange(uint64_t FileOffset, uint64_t ByteCount); - void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); - void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); - void Write(MemoryView Data, uint64_t FileOffset); - void Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec); - uint64_t Write(CompositeBuffer Data, uint64_t FileOffset, std::error_code& Ec); - void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec); - void Flush(); - [[nodiscard]] uint64_t FileSize() const; - [[nodiscard]] uint64_t FileSize(std::error_code& Ec) const; - void SetFileSize(uint64_t FileSize); - IoBuffer ReadAll(); - void WriteAll(IoBuffer Data, std::error_code& Ec); - void Attach(void* Handle); - void* Detach(); - - inline void* Handle() { return m_FileHandle; } - bool IsOpen() const { return m_FileHandle != nullptr; } - -protected: - void* m_FileHandle = nullptr; // This is either null or valid -private: -}; - -ENUM_CLASS_FLAGS(BasicFile::Mode); - -/** - * Simple abstraction for a temporary file - * - * Works like a regular BasicFile but implements a simple mechanism to allow creating - * a temporary file for writing in a directory which may later be moved atomically - * into the intended location after it has been fully written to. - * - */ - -class TemporaryFile : public BasicFile -{ -public: - TemporaryFile() = default; - ~TemporaryFile(); - - TemporaryFile(const TemporaryFile&) = delete; - TemporaryFile& operator=(const TemporaryFile&) = delete; - - void CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec); - void MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec); - const std::filesystem::path& GetPath() const { return m_TempPath; } - - static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data); - static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, std::error_code& OutEc); - -private: - void Close(); - std::filesystem::path m_TempPath; - - using BasicFile::Open; -}; - -/** Lock file abstraction - - */ - -class LockFile : protected BasicFile -{ -public: - LockFile(); - ~LockFile(); - - void Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec); - void Update(CbObject Payload, std::error_code& Ec); - -private: -}; - -/** Adds a layer of buffered reading to a BasicFile - -This class is not intended for concurrent access, it is not thread safe. - -*/ - -class BasicFileBuffer -{ -public: - BasicFileBuffer(BasicFile& Base, uint64_t BufferSize); - ~BasicFileBuffer(); - - void Read(void* Data, uint64_t Size, uint64_t FileOffset); - MemoryView MakeView(uint64_t Size, uint64_t FileOffset); - - template<typename T> - const T* MakeView(uint64_t FileOffset) - { - MemoryView View = MakeView(sizeof(T), FileOffset); - return reinterpret_cast<const T*>(View.GetData()); - } - -private: - BasicFile& m_Base; - uint8_t* m_Buffer; - const uint64_t m_BufferSize; - uint64_t m_Size; - uint64_t m_BufferStart; - uint64_t m_BufferEnd; -}; - -/** Adds a layer of buffered writing to a BasicFile - -This class is not intended for concurrent access, it is not thread safe. - -*/ - -class BasicFileWriter -{ -public: - BasicFileWriter(BasicFile& Base, uint64_t BufferSize); - ~BasicFileWriter(); - - void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Flush(); - -private: - BasicFile& m_Base; - uint8_t* m_Buffer; - const uint64_t m_BufferSize; - uint64_t m_BufferStart; - uint64_t m_BufferEnd; -}; - -ZENCORE_API void basicfile_forcelink(); - -} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter.h b/src/zenutil/include/zenutil/jupiter.h new file mode 100644 index 000000000..50e4ad68a --- /dev/null +++ b/src/zenutil/include/zenutil/jupiter.h @@ -0,0 +1,256 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenbase/refcount.h> +#include <zencore/iohash.h> +#include <zencore/logging.h> +#include <zencore/thread.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/httpserver.h> + +#include <atomic> +#include <chrono> +#include <list> +#include <memory> +#include <set> +#include <vector> + +struct ZenCacheValue; + +namespace cpr { +class Session; +} + +namespace zen { + +class CbObjectView; +class CloudCacheClient; +class IoBuffer; +struct IoHash; + +/** + * Cached access token, for use with `Authorization:` header + */ +struct CloudCacheAccessToken +{ + using Clock = std::chrono::system_clock; + using TimePoint = Clock::time_point; + + static constexpr int64_t ExpireMarginInSeconds = 30; + + std::string Value; + TimePoint ExpireTime; + + bool IsValid() const + { + return Value.empty() == false && + ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count(); + } +}; + +struct CloudCacheResult +{ + IoBuffer Response; + uint64_t SentBytes{}; + uint64_t ReceivedBytes{}; + double ElapsedSeconds{}; + int32_t ErrorCode{}; + std::string Reason; + bool Success = false; +}; + +struct PutRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +struct CloudCacheExistsResult : CloudCacheResult +{ + std::set<IoHash> Needs; +}; + +struct GetObjectReferencesResult : CloudCacheResult +{ + std::set<IoHash> References; +}; + +struct PutBuildPartResult : CloudCacheResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeBuildPartResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +/** + * Context for performing Jupiter operations + * + * Maintains an HTTP connection so that subsequent operations don't need to go + * through the whole connection setup process + * + */ +class CloudCacheSession +{ +public: + CloudCacheSession(CloudCacheClient* CacheClient); + ~CloudCacheSession(); + + CloudCacheResult Authenticate(); + + CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); + CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath = {}); + + PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); + CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); + + FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + + CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); + + GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); + + CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key); + + CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); + + std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + + CloudCacheResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); + CloudCacheResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + CloudCacheResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + PutBuildPartResult PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload); + CloudCacheResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + CloudCacheResult PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload); + CloudCacheResult GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath); + CloudCacheResult PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + const IoBuffer& Payload); + FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash); + CloudCacheResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + + CloudCacheClient& Client() { return *m_CacheClient; }; + +private: + inline LoggerRef Log() { return m_Log; } + + CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); + + CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); + + LoggerRef m_Log; + RefPtr<CloudCacheClient> m_CacheClient; +}; + +/** + * Access token provider interface + */ +class CloudCacheTokenProvider +{ +public: + virtual ~CloudCacheTokenProvider() = default; + + virtual CloudCacheAccessToken AcquireAccessToken() = 0; + + static std::unique_ptr<CloudCacheTokenProvider> CreateFromStaticToken(CloudCacheAccessToken Token); + + struct OAuthClientCredentialsParams + { + std::string_view Url; + std::string_view ClientId; + std::string_view ClientSecret; + }; + + static std::unique_ptr<CloudCacheTokenProvider> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params); + + static std::unique_ptr<CloudCacheTokenProvider> CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback); +}; + +struct CloudCacheClientOptions +{ + std::string_view Name; + std::string_view ServiceUrl; + std::string_view DdcNamespace; + std::string_view BlobStoreNamespace; + std::string_view ComputeCluster; + std::chrono::milliseconds ConnectTimeout{5000}; + std::chrono::milliseconds Timeout{}; + bool AssumeHttp2 = false; + bool AllowResume = false; + uint8_t RetryCount = 0; +}; + +/** + * Jupiter upstream cache client + */ +class CloudCacheClient : public RefCounted +{ +public: + CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider); + ~CloudCacheClient(); + + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } + std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); } + + LoggerRef Logger() { return m_Log; } + +private: + LoggerRef m_Log; + const std::string m_DefaultDdcNamespace; + const std::string m_DefaultBlobStoreNamespace; + const std::string m_ComputeCluster; + const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; + HttpClient m_HttpClient; + + friend class CloudCacheSession; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index 3eb9021dd..758722156 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -2,8 +2,8 @@ #pragma once +#include <zencore/basicfile.h> #include <zencore/memory/llm.h> -#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <spdlog/details/log_msg.h> diff --git a/src/zenutil/include/zenutil/packageformat.h b/src/zenutil/include/zenutil/packageformat.h deleted file mode 100644 index c90b840da..000000000 --- a/src/zenutil/include/zenutil/packageformat.h +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/compactbinarypackage.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> - -#include <functional> -#include <gsl/gsl-lite.hpp> - -namespace zen { - -class IoBuffer; -class CbPackage; -class CompositeBuffer; - -/** _____ _ _____ _ - / ____| | | __ \ | | - | | | |__ | |__) |_ _ ___| | ____ _ __ _ ___ - | | | '_ \| ___/ _` |/ __| |/ / _` |/ _` |/ _ \ - | |____| |_) | | | (_| | (__| < (_| | (_| | __/ - \_____|_.__/|_| \__,_|\___|_|\_\__,_|\__, |\___| - __/ | - |___/ - - Structures and code related to handling CbPackage transactions - - CbPackage instances are marshaled across the wire using a distinct message - format. We don't use the CbPackage serialization format provided by the - CbPackage implementation itself since that does not provide much flexibility - in how the attachment payloads are transmitted. The scheme below separates - metadata cleanly from payloads and this enables us to more efficiently - transmit them either via sendfile/TransmitFile like mechanisms, or by - reference/memory mapping in the local case. - */ - -struct CbPackageHeader -{ - uint32_t HeaderMagic; - uint32_t AttachmentCount; // TODO: should add ability to opt out of implicit root document? - uint32_t Reserved1; - uint32_t Reserved2; -}; - -static_assert(sizeof(CbPackageHeader) == 16); - -enum : uint32_t -{ - kCbPkgMagic = 0xaa77aacc -}; - -struct CbAttachmentEntry -{ - uint64_t PayloadSize; // Size of the associated payload data in the message - uint32_t Flags; // See flags below - IoHash AttachmentHash; // Content Id for the attachment - - enum - { - kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format - kIsObject = (1u << 1), // Is compact binary object - kIsError = (1u << 2), // Is error (compact binary formatted) object - kIsLocalRef = (1u << 3), // Is "local reference" - }; -}; - -struct CbAttachmentReferenceHeader -{ - uint64_t PayloadByteOffset = 0; - uint64_t PayloadByteSize = ~0u; - uint16_t AbsolutePathLength = 0; - - // This header will be followed by UTF8 encoded absolute path to backing file -}; - -static_assert(sizeof(CbAttachmentEntry) == 32); - -enum class FormatFlags -{ - kDefault = 0, - kAllowLocalReferences = (1u << 0), - kDenyPartialLocalReferences = (1u << 1) -}; - -gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags); - -enum class RpcAcceptOptions : uint16_t -{ - kNone = 0, - kAllowLocalReferences = (1u << 0), - kAllowPartialLocalReferences = (1u << 1), - kAllowPartialCacheChunks = (1u << 2) -}; - -gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions); - -std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle = nullptr); -CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle = nullptr); -CbPackage ParsePackageMessage( - IoBuffer Payload, - std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer { - return IoBuffer{Size}; - }); -bool IsPackageMessage(IoBuffer Payload); - -bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage); - -std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle = nullptr); -CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle = nullptr); - -/** Streaming reader for compact binary packages - - The goal is to ultimately support zero-copy I/O, but for now there'll be some - copying involved on some platforms at least. - - This approach to deserializing CbPackage data is more efficient than - `ParsePackageMessage` since it does not require the entire message to - be resident in a memory buffer - - */ -class CbPackageReader -{ -public: - CbPackageReader(); - ~CbPackageReader(); - - void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer); - - /** Process compact binary package data stream - - The data stream must be in the serialization format produced by FormatPackageMessage - - \return How many bytes must be fed to this function in the next call - */ - uint64_t ProcessPackageHeaderData(const void* Data, uint64_t DataBytes); - - void Finalize(); - const std::vector<CbAttachment>& GetAttachments() { return m_Attachments; } - CbObject GetRootObject() { return m_RootObject; } - std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; } - -private: - enum class State - { - kInitialState, - kReadingHeader, - kReadingAttachmentEntries, - kReadingBuffers - } m_CurrentState = State::kInitialState; - - std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer; - std::vector<IoBuffer> m_PayloadBuffers; - std::vector<CbAttachmentEntry> m_AttachmentEntries; - std::vector<CbAttachment> m_Attachments; - CbObject m_RootObject; - CbPackageHeader m_PackageHeader; - - IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer); -}; - -void forcelink_packageformat(); - -} // namespace zen |