diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenutil/include | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenutil/include')
| -rw-r--r-- | src/zenutil/include/zenutil/basicfile.h | 113 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/cache.h | 6 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/cachekey.h | 86 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/cachepolicy.h | 227 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/cacherequests.h | 279 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/rpcrecording.h | 29 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenserverprocess.h | 141 |
7 files changed, 881 insertions, 0 deletions
diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h new file mode 100644 index 000000000..877df0f92 --- /dev/null +++ b/src/zenutil/include/zenutil/basicfile.h @@ -0,0 +1,113 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iobuffer.h> + +#include <filesystem> +#include <functional> + +namespace zen { + +class CbObject; + +/** + * Probably the most basic file abstraction in the universe + * + * One thing of note is that there is no notion of a "current file position" + * in this API -- all reads and writes are done from explicit offsets in + * the file. This avoids concurrency issues which can occur otherwise. + * + */ + +class BasicFile +{ +public: + BasicFile() = default; + ~BasicFile(); + + BasicFile(const BasicFile&) = delete; + BasicFile& operator=(const BasicFile&) = delete; + + enum class Mode : uint32_t + { + kRead = 0, // Opens a existing file for read only + kWrite = 1, // Opens (or creates) a file for read and write + kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero + kDelete = 3, // Opens (or creates) a file for read and write allowing .DeleteFile file disposition to be set + kTruncateDelete = + 4 // Opens (or creates) a file for read and write and sets the size to zero allowing .DeleteFile file disposition to be set + }; + + void Open(const std::filesystem::path& FileName, Mode Mode); + void Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec); + void Close(); + void Read(void* Data, uint64_t Size, uint64_t FileOffset); + void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); + void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); + void Write(MemoryView Data, uint64_t FileOffset); + void Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec); + void Write(const void* Data, uint64_t Size, uint64_t FileOffset); + void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec); + void Flush(); + uint64_t FileSize(); + void SetFileSize(uint64_t FileSize); + IoBuffer ReadAll(); + void WriteAll(IoBuffer Data, std::error_code& Ec); + void* Detach(); + + inline void* Handle() { return m_FileHandle; } + +protected: + void* m_FileHandle = nullptr; // This is either null or valid +private: +}; + +/** + * Simple abstraction for a temporary file + * + * Works like a regular BasicFile but implements a simple mechanism to allow creating + * a temporary file for writing in a directory which may later be moved atomically + * into the intended location after it has been fully written to. + * + */ + +class TemporaryFile : public BasicFile +{ +public: + TemporaryFile() = default; + ~TemporaryFile(); + + TemporaryFile(const TemporaryFile&) = delete; + TemporaryFile& operator=(const TemporaryFile&) = delete; + + void Close(); + void CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec); + void MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec); + const std::filesystem::path& GetPath() const { return m_TempPath; } + +private: + std::filesystem::path m_TempPath; + + using BasicFile::Open; +}; + +/** Lock file abstraction + + */ + +class LockFile : protected BasicFile +{ +public: + LockFile(); + ~LockFile(); + + void Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec); + void Update(CbObject Payload, std::error_code& Ec); + +private: +}; + +ZENCORE_API void basicfile_forcelink(); + +} // namespace zen diff --git a/src/zenutil/include/zenutil/cache/cache.h b/src/zenutil/include/zenutil/cache/cache.h new file mode 100644 index 000000000..1a1dd9386 --- /dev/null +++ b/src/zenutil/include/zenutil/cache/cache.h @@ -0,0 +1,6 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenutil/cache/cachekey.h> +#include <zenutil/cache/cachepolicy.h> diff --git a/src/zenutil/include/zenutil/cache/cachekey.h b/src/zenutil/include/zenutil/cache/cachekey.h new file mode 100644 index 000000000..741375946 --- /dev/null +++ b/src/zenutil/include/zenutil/cache/cachekey.h @@ -0,0 +1,86 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/string.h> +#include <zencore/uid.h> + +#include <zenutil/cache/cachepolicy.h> + +namespace zen { + +struct CacheKey +{ + std::string Bucket; + IoHash Hash; + + static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; } + + auto operator<=>(const CacheKey& that) const + { + if (auto b = caseSensitiveCompareStrings(Bucket, that.Bucket); b != std::strong_ordering::equal) + { + return b; + } + return Hash <=> that.Hash; + } + + auto operator==(const CacheKey& that) const { return (*this <=> that) == std::strong_ordering::equal; } + + static const CacheKey Empty; +}; + +struct CacheChunkRequest +{ + CacheKey Key; + IoHash ChunkId; + Oid ValueId; + uint64_t RawOffset = 0ull; + uint64_t RawSize = ~uint64_t(0); + CachePolicy Policy = CachePolicy::Default; +}; + +struct CacheKeyRequest +{ + CacheKey Key; + CacheRecordPolicy Policy; +}; + +struct CacheValueRequest +{ + CacheKey Key; + CachePolicy Policy = CachePolicy::Default; +}; + +inline bool +operator<(const CacheChunkRequest& A, const CacheChunkRequest& B) +{ + if (A.Key < B.Key) + { + return true; + } + if (B.Key < A.Key) + { + return false; + } + if (A.ChunkId < B.ChunkId) + { + return true; + } + if (B.ChunkId < A.ChunkId) + { + return false; + } + if (A.ValueId < B.ValueId) + { + return true; + } + if (B.ValueId < A.ValueId) + { + return false; + } + return A.RawOffset < B.RawOffset; +} + +} // namespace zen diff --git a/src/zenutil/include/zenutil/cache/cachepolicy.h b/src/zenutil/include/zenutil/cache/cachepolicy.h new file mode 100644 index 000000000..9a745e42c --- /dev/null +++ b/src/zenutil/include/zenutil/cache/cachepolicy.h @@ -0,0 +1,227 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/enumflags.h> +#include <zencore/refcount.h> +#include <zencore/string.h> +#include <zencore/uid.h> + +#include <gsl/gsl-lite.hpp> +#include <span> +namespace zen::Private { +class ICacheRecordPolicyShared; +} +namespace zen { + +class CbObjectView; +class CbWriter; + +class OptionalCacheRecordPolicy; + +enum class CachePolicy : uint32_t +{ + /** A value with no flags. Disables access to the cache unless combined with other flags. */ + None = 0, + + /** Allow a cache request to query local caches. */ + QueryLocal = 1 << 0, + /** Allow a cache request to query remote caches. */ + QueryRemote = 1 << 1, + /** Allow a cache request to query any caches. */ + Query = QueryLocal | QueryRemote, + + /** Allow cache requests to query and store records and values in local caches. */ + StoreLocal = 1 << 2, + /** Allow cache records and values to be stored in remote caches. */ + StoreRemote = 1 << 3, + /** Allow cache records and values to be stored in any caches. */ + Store = StoreLocal | StoreRemote, + + /** Allow cache requests to query and store records and values in local caches. */ + Local = QueryLocal | StoreLocal, + /** Allow cache requests to query and store records and values in remote caches. */ + Remote = QueryRemote | StoreRemote, + + /** Allow cache requests to query and store records and values in any caches. */ + Default = Query | Store, + + /** Skip fetching the data for values. */ + SkipData = 1 << 4, + + /** Skip fetching the metadata for record requests. */ + SkipMeta = 1 << 5, + + /** + * Partial output will be provided with the error status when a required value is missing. + * + * This is meant for cases when the missing values can be individually recovered, or rebuilt, + * without rebuilding the whole record. The cache automatically adds this flag when there are + * other cache stores that it may be able to recover missing values from. + * + * Missing values will be returned in the records, but with only the hash and size. + * + * Applying this flag for a put of a record allows a partial record to be stored. + */ + PartialRecord = 1 << 6, + + /** + * Keep records in the cache for at least the duration of the session. + * + * This is a hint that the record may be accessed again in this session. This is mainly meant + * to be used when subsequent accesses will not tolerate a cache miss. + */ + KeepAlive = 1 << 7, +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); +/** Append a non-empty text version of the policy to the builder. */ +StringBuilderBase& operator<<(StringBuilderBase& Builder, CachePolicy Policy); +/** Parse non-empty text written by operator<< into a policy. */ +CachePolicy ParseCachePolicy(std::string_view Text); +/** Return input converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server. */ +CachePolicy ConvertToUpstream(CachePolicy Policy); + +inline CachePolicy +Union(CachePolicy A, CachePolicy B) +{ + constexpr CachePolicy InvertedFlags = CachePolicy::SkipData | CachePolicy::SkipMeta; + return (A & ~(InvertedFlags)) | (B & ~(InvertedFlags)) | (A & B & InvertedFlags); +} + +/** A value ID and the cache policy to use for that value. */ +struct CacheValuePolicy +{ + Oid Id; + CachePolicy Policy = CachePolicy::Default; + + /** Flags that are valid on a value policy. */ + static constexpr CachePolicy PolicyMask = CachePolicy::Default | CachePolicy::SkipData; +}; + +/** Interface for the private implementation of the cache record policy. */ +class Private::ICacheRecordPolicyShared : public RefCounted +{ +public: + virtual ~ICacheRecordPolicyShared() = default; + virtual void AddValuePolicy(const CacheValuePolicy& Policy) = 0; + virtual std::span<const CacheValuePolicy> GetValuePolicies() const = 0; +}; + +/** + * Flags to control the behavior of cache record requests, with optional overrides by value. + * + * Examples: + * - A base policy of None with value policy overrides of Default will fetch those values if they + * exist in the record, and skip data for any other values. + * - A base policy of Default, with value policy overrides of (Query | SkipData), will skip those + * values, but still check if they exist, and will load any other values. + */ +class CacheRecordPolicy +{ +public: + /** Construct a cache record policy that uses the default policy. */ + CacheRecordPolicy() = default; + + /** Construct a cache record policy with a uniform policy for the record and every value. */ + inline CacheRecordPolicy(CachePolicy BasePolicy) + : RecordPolicy(BasePolicy) + , DefaultValuePolicy(BasePolicy & CacheValuePolicy::PolicyMask) + { + } + + /** Returns true if the record and every value use the same cache policy. */ + inline bool IsUniform() const { return !Shared; } + + /** Returns the cache policy to use for the record. */ + inline CachePolicy GetRecordPolicy() const { return RecordPolicy; } + + /** Returns the base cache policy that this was constructed from. */ + inline CachePolicy GetBasePolicy() const { return DefaultValuePolicy | (RecordPolicy & ~CacheValuePolicy::PolicyMask); } + + /** Returns the cache policy to use for the value. */ + CachePolicy GetValuePolicy(const Oid& Id) const; + + /** Returns the array of cache policy overrides for values, sorted by ID. */ + inline std::span<const CacheValuePolicy> GetValuePolicies() const + { + return Shared ? Shared->GetValuePolicies() : std::span<const CacheValuePolicy>(); + } + + /** Saves the cache record policy to a compact binary object. */ + void Save(CbWriter& Writer) const; + + /** Loads a cache record policy from an object. */ + static OptionalCacheRecordPolicy Load(CbObjectView Object); + + /** Return *this converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server. + */ + CacheRecordPolicy ConvertToUpstream() const; + +private: + friend class CacheRecordPolicyBuilder; + friend class OptionalCacheRecordPolicy; + + CachePolicy RecordPolicy = CachePolicy::Default; + CachePolicy DefaultValuePolicy = CachePolicy::Default; + RefPtr<const Private::ICacheRecordPolicyShared> Shared; +}; + +/** A cache record policy builder is used to construct a cache record policy. */ +class CacheRecordPolicyBuilder +{ +public: + /** Construct a policy builder that uses the default policy as its base policy. */ + CacheRecordPolicyBuilder() = default; + + /** Construct a policy builder that uses the provided policy for the record and values with no override. */ + inline explicit CacheRecordPolicyBuilder(CachePolicy Policy) : BasePolicy(Policy) {} + + /** Adds a cache policy override for a value. */ + void AddValuePolicy(const CacheValuePolicy& Value); + inline void AddValuePolicy(const Oid& Id, CachePolicy Policy) { AddValuePolicy({Id, Policy}); } + + /** Build a cache record policy, which makes this builder subsequently unusable. */ + CacheRecordPolicy Build(); + +private: + CachePolicy BasePolicy = CachePolicy::Default; + RefPtr<Private::ICacheRecordPolicyShared> Shared; +}; + +/** + * A cache record policy that can be null. + * + * @see CacheRecordPolicy + */ +class OptionalCacheRecordPolicy : private CacheRecordPolicy +{ +public: + inline OptionalCacheRecordPolicy() : CacheRecordPolicy(~CachePolicy::None) {} + + inline OptionalCacheRecordPolicy(CacheRecordPolicy&& InOutput) : CacheRecordPolicy(std::move(InOutput)) {} + inline OptionalCacheRecordPolicy(const CacheRecordPolicy& InOutput) : CacheRecordPolicy(InOutput) {} + inline OptionalCacheRecordPolicy& operator=(CacheRecordPolicy&& InOutput) + { + CacheRecordPolicy::operator=(std::move(InOutput)); + return *this; + } + inline OptionalCacheRecordPolicy& operator=(const CacheRecordPolicy& InOutput) + { + CacheRecordPolicy::operator=(InOutput); + return *this; + } + + /** Returns the cache record policy. The caller must check for null before using this accessor. */ + inline const CacheRecordPolicy& Get() const& { return *this; } + inline CacheRecordPolicy Get() && { return std::move(*this); } + + inline bool IsNull() const { return RecordPolicy == ~CachePolicy::None; } + inline bool IsValid() const { return !IsNull(); } + inline explicit operator bool() const { return !IsNull(); } + + inline void Reset() { *this = OptionalCacheRecordPolicy(); } +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/cache/cacherequests.h b/src/zenutil/include/zenutil/cache/cacherequests.h new file mode 100644 index 000000000..f1999ebfe --- /dev/null +++ b/src/zenutil/include/zenutil/cache/cacherequests.h @@ -0,0 +1,279 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compress.h> + +#include "cachekey.h" +#include "cachepolicy.h" + +#include <functional> + +namespace zen { + +class CbPackage; +class CbObjectWriter; +class CbObjectView; + +namespace cacherequests { + // I'd really like to get rid of std::optional<CacheRecordPolicy> (or really the class CacheRecordPolicy) + // + // CacheRecordPolicy has a record level policy but it can also contain policies for individual + // values inside the record. + // + // However, when we do a "PutCacheRecords" we already list the individual Values with their Id + // so we can just as well use an optional plain CachePolicy for each value. + // + // In "GetCacheRecords" we do not currently as for the individual values but you can add + // a policy on a per-value level in the std::optional<CacheRecordPolicy> Policy for each record. + // + // But as we already need to know the Ids of the values we want to set the policy for + // it would be simpler to add an array of requested values which each has an optional policy. + // + // We could add: + // struct GetCacheRecordValueRequest + // { + // Oid Id; + // std::optional<CachePolicy> Policy; + // }; + // + // and change GetCacheRecordRequest to + // struct GetCacheRecordRequest + // { + // CacheKey Key = CacheKey::Empty; + // std::vector<GetCacheRecordValueRequest> ValueRequests; + // std::optional<CachePolicy> Policy; + // }; + // + // This way we don't need the complex CacheRecordPolicy class and the request becomes + // more uniform and easier to understand. + // + // Would need to decide what the ValueRequests actually mean: + // Do they dictate which values to fetch or just a change of the policy? + // If they dictate the values to fetch you need to know all the value ids to set them + // and that is unlikely what we want - we want to be able to get a cache record with + // all its values without knowing all the Ids, right? + // + + ////////////////////////////////////////////////////////////////////////// + // Put 1..n structured cache records with optional attachments + + struct PutCacheRecordRequestValue + { + Oid Id = Oid::Zero; + IoHash RawHash = IoHash::Zero; // If Body is not set, this must be set and the value must already exist in cache + CompressedBuffer Body = CompressedBuffer::Null; + }; + + struct PutCacheRecordRequest + { + CacheKey Key = CacheKey::Empty; + std::vector<PutCacheRecordRequestValue> Values; + std::optional<CacheRecordPolicy> Policy; + }; + + struct PutCacheRecordsRequest + { + uint32_t AcceptMagic = 0; + CachePolicy DefaultPolicy = CachePolicy::Default; + std::string Namespace; + std::vector<PutCacheRecordRequest> Requests; + + bool Parse(const CbPackage& Package); + bool Format(CbPackage& OutPackage) const; + }; + + struct PutCacheRecordsResult + { + std::vector<bool> Success; + + bool Parse(const CbPackage& Package); + bool Format(CbPackage& OutPackage) const; + }; + + ////////////////////////////////////////////////////////////////////////// + // Get 1..n structured cache records with optional attachments + // We can get requests for a cache record where we want care about a particular + // value id which we now of, but we don't know the ids of the other values and + // we still want them. + // Not sure if in that case we want different policies for the different attachemnts? + + struct GetCacheRecordRequest + { + CacheKey Key = CacheKey::Empty; + std::optional<CacheRecordPolicy> Policy; + }; + + struct GetCacheRecordsRequest + { + uint32_t AcceptMagic = 0; + uint16_t AcceptOptions = 0; + int32_t ProcessPid = 0; + CachePolicy DefaultPolicy = CachePolicy::Default; + std::string Namespace; + std::vector<GetCacheRecordRequest> Requests; + + bool Parse(const CbPackage& RpcRequest); + bool Parse(const CbObjectView& RpcRequest); + bool Format(CbPackage& OutPackage, const std::span<const size_t> OptionalRecordFilter = {}) const; + bool Format(CbObjectWriter& Writer, const std::span<const size_t> OptionalRecordFilter = {}) const; + }; + + struct GetCacheRecordResultValue + { + Oid Id = Oid::Zero; + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; + CompressedBuffer Body = CompressedBuffer::Null; + }; + + struct GetCacheRecordResult + { + CacheKey Key = CacheKey::Empty; + std::vector<GetCacheRecordResultValue> Values; + }; + + struct GetCacheRecordsResult + { + std::vector<std::optional<GetCacheRecordResult>> Results; + + bool Parse(const CbPackage& Package, const std::span<const size_t> OptionalRecordResultIndexes = {}); + bool Format(CbPackage& OutPackage) const; + }; + + ////////////////////////////////////////////////////////////////////////// + // Put 1..n unstructured cache objects + + struct PutCacheValueRequest + { + CacheKey Key = CacheKey::Empty; + IoHash RawHash = IoHash::Zero; + CompressedBuffer Body = CompressedBuffer::Null; // If not set the value is expected to already exist in cache store + std::optional<CachePolicy> Policy; + }; + + struct PutCacheValuesRequest + { + uint32_t AcceptMagic = 0; + CachePolicy DefaultPolicy = CachePolicy::Default; + std::string Namespace; + std::vector<PutCacheValueRequest> Requests; + + bool Parse(const CbPackage& Package); + bool Format(CbPackage& OutPackage) const; + }; + + struct PutCacheValuesResult + { + std::vector<bool> Success; + + bool Parse(const CbPackage& Package); + bool Format(CbPackage& OutPackage) const; + }; + + ////////////////////////////////////////////////////////////////////////// + // Get 1..n unstructured cache objects (stored data may be structured or unstructured) + + struct GetCacheValueRequest + { + CacheKey Key = CacheKey::Empty; + std::optional<CachePolicy> Policy; + }; + + struct GetCacheValuesRequest + { + uint32_t AcceptMagic = 0; + uint16_t AcceptOptions = 0; + int32_t ProcessPid = 0; + CachePolicy DefaultPolicy = CachePolicy::Default; + std::string Namespace; + std::vector<GetCacheValueRequest> Requests; + + bool Parse(const CbObjectView& BatchObject); + bool Format(CbPackage& OutPackage, const std::span<const size_t> OptionalValueFilter = {}) const; + }; + + struct CacheValueResult + { + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; + CompressedBuffer Body = CompressedBuffer::Null; + }; + + struct CacheValuesResult + { + std::vector<CacheValueResult> Results; + + bool Parse(const CbPackage& Package, const std::span<const size_t> OptionalValueResultIndexes = {}); + bool Format(CbPackage& OutPackage) const; + }; + + typedef CacheValuesResult GetCacheValuesResult; + + ////////////////////////////////////////////////////////////////////////// + // Get 1..n cache record values (attachments) for 1..n records + + struct GetCacheChunkRequest + { + CacheKey Key; + Oid ValueId = Oid::Zero; // Set if ChunkId is not known at request time + IoHash ChunkId = IoHash::Zero; + uint64_t RawOffset = 0ull; + uint64_t RawSize = ~uint64_t(0); + std::optional<CachePolicy> Policy; + }; + + struct GetCacheChunksRequest + { + uint32_t AcceptMagic = 0; + uint16_t AcceptOptions = 0; + int32_t ProcessPid = 0; + CachePolicy DefaultPolicy = CachePolicy::Default; + std::string Namespace; + std::vector<GetCacheChunkRequest> Requests; + + bool Parse(const CbObjectView& BatchObject); + bool Format(CbPackage& OutPackage) const; + }; + + typedef CacheValuesResult GetCacheChunksResult; + + ////////////////////////////////////////////////////////////////////////// + + struct HttpRequestData + { + std::optional<std::string> Namespace; + std::optional<std::string> Bucket; + std::optional<IoHash> HashKey; + std::optional<IoHash> ValueContentId; + }; + + bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data); + + // Temporarily public + std::optional<std::string> GetRequestNamespace(const CbObjectView& Params); + bool GetRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key); + + ////////////////////////////////////////////////////////////////////////// + + // struct CacheRecordValue + // { + // Oid Id = Oid::Zero; + // IoHash RawHash = IoHash::Zero; + // uint64_t RawSize = 0; + // }; + // + // struct CacheRecord + // { + // CacheKey Key = CacheKey::Empty; + // std::vector<CacheRecordValue> Values; + // + // bool Parse(CbObjectView& Reader); + // bool Format(CbObjectWriter& Writer) const; + // }; + +} // namespace cacherequests + +void cacherequests_forcelink(); // internal + +} // namespace zen diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h new file mode 100644 index 000000000..6d65a532a --- /dev/null +++ b/src/zenutil/include/zenutil/cache/rpcrecording.h @@ -0,0 +1,29 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compositebuffer.h> +#include <zencore/iobuffer.h> + +namespace zen::cache { +class IRpcRequestRecorder +{ +public: + virtual ~IRpcRequestRecorder() {} + virtual uint64_t RecordRequest(const ZenContentType ContentType, const ZenContentType AcceptType, const IoBuffer& RequestBuffer) = 0; + virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0; + virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0; +}; +class IRpcRequestReplayer +{ +public: + virtual ~IRpcRequestReplayer() {} + virtual uint64_t GetRequestCount() const = 0; + virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; + virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; +}; + +std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); +std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory); + +} // namespace zen::cache diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h new file mode 100644 index 000000000..1c204c144 --- /dev/null +++ b/src/zenutil/include/zenutil/zenserverprocess.h @@ -0,0 +1,141 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/enumflags.h> +#include <zencore/logging.h> +#include <zencore/thread.h> +#include <zencore/uid.h> + +#include <atomic> +#include <filesystem> +#include <optional> + +namespace zen { + +class ZenServerEnvironment +{ +public: + ZenServerEnvironment(); + ~ZenServerEnvironment(); + + void Initialize(std::filesystem::path ProgramBaseDir); + void InitializeForTest(std::filesystem::path ProgramBaseDir, std::filesystem::path TestBaseDir, std::string_view ServerClass = ""); + + std::filesystem::path CreateNewTestDir(); + std::filesystem::path ProgramBaseDir() const { return m_ProgramBaseDir; } + std::filesystem::path GetTestRootDir(std::string_view Path); + inline bool IsInitialized() const { return m_IsInitialized; } + inline bool IsTestEnvironment() const { return m_IsTestInstance; } + inline std::string_view GetServerClass() const { return m_ServerClass; } + +private: + std::filesystem::path m_ProgramBaseDir; + std::filesystem::path m_TestBaseDir; + bool m_IsInitialized = false; + bool m_IsTestInstance = false; + std::string m_ServerClass; +}; + +struct ZenServerInstance +{ + ZenServerInstance(ZenServerEnvironment& TestEnvironment); + ~ZenServerInstance(); + + void Shutdown(); + void SignalShutdown(); + void WaitUntilReady(); + [[nodiscard]] bool WaitUntilReady(int Timeout); + void EnableTermination() { m_Terminate = true; } + void Detach(); + inline int GetPid() { return m_Process.Pid(); } + inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; } + + void SetTestDir(std::filesystem::path TestDir) + { + ZEN_ASSERT(!m_Process.IsValid()); + m_TestDir = TestDir; + } + + void SpawnServer(int BasePort = 0, std::string_view AdditionalServerArgs = std::string_view()); + + void AttachToRunningServer(int BasePort = 0); + + std::string GetBaseUri() const; + +private: + ZenServerEnvironment& m_Env; + ProcessHandle m_Process; + NamedEvent m_ReadyEvent; + NamedEvent m_ShutdownEvent; + bool m_Terminate = false; + std::filesystem::path m_TestDir; + int m_BasePort = 0; + std::optional<int> m_OwnerPid; + + void CreateShutdownEvent(int BasePort); +}; + +/** Shared system state + * + * Used as a scratchpad to identify running instances etc + * + * The state lives in a memory-mapped file backed by the swapfile + * + */ + +class ZenServerState +{ +public: + ZenServerState(); + ~ZenServerState(); + + struct ZenServerEntry + { + // NOTE: any changes to this should consider backwards compatibility + // which means you should not rearrange members only potentially + // add something to the end or use a different mechanism for + // additional state. For example, you can use the session ID + // to introduce additional named objects + std::atomic<uint32_t> Pid; + std::atomic<uint16_t> DesiredListenPort; + std::atomic<uint16_t> Flags; + uint8_t SessionId[12]; + std::atomic<uint32_t> SponsorPids[8]; + std::atomic<uint16_t> EffectiveListenPort; + uint8_t Padding[10]; + + enum class FlagsEnum : uint16_t + { + kShutdownPlease = 1 << 0, + kIsReady = 1 << 1, + }; + + FRIEND_ENUM_CLASS_FLAGS(FlagsEnum); + + Oid GetSessionId() const { return Oid::FromMemory(SessionId); } + void Reset(); + void SignalShutdownRequest(); + void SignalReady(); + bool AddSponsorProcess(uint32_t Pid); + }; + + static_assert(sizeof(ZenServerEntry) == 64); + + void Initialize(); + [[nodiscard]] bool InitializeReadOnly(); + [[nodiscard]] ZenServerEntry* Lookup(int DesiredListenPort); + ZenServerEntry* Register(int DesiredListenPort); + void Sweep(); + void Snapshot(std::function<void(const ZenServerEntry&)>&& Callback); + inline bool IsReadOnly() const { return m_IsReadOnly; } + +private: + void* m_hMapFile = nullptr; + ZenServerEntry* m_Data = nullptr; + int m_MaxEntryCount = 65536 / sizeof(ZenServerEntry); + ZenServerEntry* m_OurEntry = nullptr; + bool m_IsReadOnly = true; +}; + +} // namespace zen |