diff options
| author | Martin Ridgers <[email protected]> | 2021-09-16 17:08:01 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-09-16 17:08:01 +0200 |
| commit | 8da2c13a34fd6394aecaf19490d65a8a84592e3c (patch) | |
| tree | 702cb3aec8145209fb5d8e39d8bf6d1432dd1a33 | |
| parent | Another missing include (diff) | |
| parent | Compact binary package caching support (#9) (diff) | |
| download | zen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.tar.xz zen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.zip | |
Merge main into linux-mac
36 files changed, 971 insertions, 273 deletions
diff --git a/zencore-test/zencore-test.cpp b/zencore-test/zencore-test.cpp index 1782f1926..559349076 100644 --- a/zencore-test/zencore-test.cpp +++ b/zencore-test/zencore-test.cpp @@ -1,8 +1,8 @@ // zencore-test.cpp : Defines the entry point for the console application. // -#include <zencore/zencore.h> #include <zencore/logging.h> +#include <zencore/zencore.h> #define DOCTEST_CONFIG_IMPLEMENT #include <doctest/doctest.h> diff --git a/zencore/except.cpp b/zencore/except.cpp index 75d0c8dd1..84e52ab9f 100644 --- a/zencore/except.cpp +++ b/zencore/except.cpp @@ -20,7 +20,7 @@ ThrowSystemException([[maybe_unused]] HRESULT hRes, [[maybe_unused]] std::string } } -#endif // ZEN_PLATFORM_WINDOWS +#endif // ZEN_PLATFORM_WINDOWS void ThrowLastError(std::string_view Message) diff --git a/zencore/include/zencore/atomic.h b/zencore/include/zencore/atomic.h index 7e261771b..bf549e21d 100644 --- a/zencore/include/zencore/atomic.h +++ b/zencore/include/zencore/atomic.h @@ -5,9 +5,9 @@ #include <zencore/zencore.h> #if ZEN_COMPILER_MSC -#include <intrin.h> +# include <intrin.h> #else -#include <atomic> +# include <atomic> #endif #include <cinttypes> diff --git a/zencore/include/zencore/except.h b/zencore/include/zencore/except.h index 90f7d45db..f0e04a795 100644 --- a/zencore/include/zencore/except.h +++ b/zencore/include/zencore/except.h @@ -51,7 +51,7 @@ private: }; ZENCORE_API void ThrowSystemException(HRESULT hRes, std::string_view Message); -#endif // ZEN_PLATFORM_WINDOWS +#endif // ZEN_PLATFORM_WINDOWS ZENCORE_API void ThrowLastError(std::string_view Message); diff --git a/zencore/include/zencore/intmath.h b/zencore/include/zencore/intmath.h index 2fdea22b5..7619e1950 100644 --- a/zencore/include/zencore/intmath.h +++ b/zencore/include/zencore/intmath.h @@ -179,6 +179,6 @@ Max(auto x, auto y) ////////////////////////////////////////////////////////////////////////// -void intmath_forcelink(); // internal +void intmath_forcelink(); // internal } // namespace zen diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index f3120983e..298952dd6 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -15,14 +15,14 @@ struct IoBufferExtendedCore; enum class ZenContentType : uint8_t { - kBinary = 0, // Note that since this is zero, this will be the default value in IoBuffer - kText = 1, - kJSON = 2, - kCbObject = 3, - kCbPackage = 4, - kYAML = 5, - kCbPackageOffer = 6, - kCompressedBinary = 7, + kBinary = 0, // Note that since this is zero, this will be the default value in IoBuffer + kText = 1, + kJSON = 2, + kCbObject = 3, + kCbPackage = 4, + kYAML = 5, + kCbPackageOffer = 6, + kCompressedBinary = 7, kUnknownContentType = 8, kCOUNT }; diff --git a/zencore/include/zencore/memory.h b/zencore/include/zencore/memory.h index 9d6339595..3d4db1081 100644 --- a/zencore/include/zencore/memory.h +++ b/zencore/include/zencore/memory.h @@ -83,7 +83,7 @@ struct MutableMemoryView { } - inline bool IsEmpty() const { return m_Data == m_DataEnd; } + inline bool IsEmpty() const { return m_Data == m_DataEnd; } void* GetData() const { return m_Data; } void* GetDataEnd() const { return m_DataEnd; } size_t GetSize() const { return reinterpret_cast<uint8_t*>(m_DataEnd) - reinterpret_cast<uint8_t*>(m_Data); } @@ -194,10 +194,10 @@ struct MemoryView { } - inline bool Contains(const MemoryView& Other) const { return (m_Data <= Other.m_Data) && (m_DataEnd >= Other.m_DataEnd); } - inline bool IsEmpty() const { return m_Data == m_DataEnd; } - const void* GetData() const { return m_Data; } - const void* GetDataEnd() const { return m_DataEnd; } + inline bool Contains(const MemoryView& Other) const { return (m_Data <= Other.m_Data) && (m_DataEnd >= Other.m_DataEnd); } + inline bool IsEmpty() const { return m_Data == m_DataEnd; } + const void* GetData() const { return m_Data; } + const void* GetDataEnd() const { return m_DataEnd; } size_t GetSize() const { return reinterpret_cast<const uint8_t*>(m_DataEnd) - reinterpret_cast<const uint8_t*>(m_Data); } inline bool operator==(const MemoryView& Rhs) const { return m_Data == Rhs.m_Data && m_DataEnd == Rhs.m_DataEnd; } diff --git a/zencore/include/zencore/session.h b/zencore/include/zencore/session.h index e66794704..2da41b2c8 100644 --- a/zencore/include/zencore/session.h +++ b/zencore/include/zencore/session.h @@ -1,3 +1,5 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + #pragma once #include <zencore/zencore.h> @@ -8,4 +10,4 @@ struct Oid; ZENCORE_API Oid GetSessionId(); -} +} // namespace zen diff --git a/zencore/include/zencore/zencore.h b/zencore/include/zencore/zencore.h index 73446b447..54df7e85e 100644 --- a/zencore/include/zencore/zencore.h +++ b/zencore/include/zencore/zencore.h @@ -10,39 +10,39 @@ // Platform // -#define ZEN_PLATFORM_WINDOWS 0 -#define ZEN_PLATFORM_LINUX 0 -#define ZEN_PLATFORM_MACOS 0 +#define ZEN_PLATFORM_WINDOWS 0 +#define ZEN_PLATFORM_LINUX 0 +#define ZEN_PLATFORM_MACOS 0 #ifdef _WIN32 -# undef ZEN_PLATFORM_WINDOWS -# define ZEN_PLATFORM_WINDOWS 1 +# undef ZEN_PLATFORM_WINDOWS +# define ZEN_PLATFORM_WINDOWS 1 #elif defined(__linux__) -# undef ZEN_PLATFORM_LINUX -# define ZEN_PLATFORM_LINUX 1 +# undef ZEN_PLATFORM_LINUX +# define ZEN_PLATFORM_LINUX 1 #elif defined(__APPLE__) -# undef ZEN_PLATFORM_MACOS -# define ZEN_PLATFORM_MACOS 1 +# undef ZEN_PLATFORM_MACOS +# define ZEN_PLATFORM_MACOS 1 #endif ////////////////////////////////////////////////////////////////////////// // Compiler // -#define ZEN_COMPILER_CLANG 0 -#define ZEN_COMPILER_MSC 0 -#define ZEN_COMPILER_GCC 0 +#define ZEN_COMPILER_CLANG 0 +#define ZEN_COMPILER_MSC 0 +#define ZEN_COMPILER_GCC 0 // Clang can define __GNUC__ and/or _MSC_VER so we check for Clang first #ifdef __clang__ -# undef ZEN_COMPILER_CLANG -# define ZEN_COMPILER_CLANG 1 +# undef ZEN_COMPILER_CLANG +# define ZEN_COMPILER_CLANG 1 #elif defined(_MSC_VER) -# undef ZEN_COMPILER_MSC -# define ZEN_COMPILER_MSC 1 +# undef ZEN_COMPILER_MSC +# define ZEN_COMPILER_MSC 1 #elif defined(__GNUC__) -# undef ZEN_COMPILER_GCC -# define ZEN_COMPILER_GCC 1 +# undef ZEN_COMPILER_GCC +# define ZEN_COMPILER_GCC 1 #else # error Unknown compiler #endif @@ -56,17 +56,16 @@ # endif #endif - ////////////////////////////////////////////////////////////////////////// // Architecture // #if defined(__amd64__) || defined(_M_X64) -# define ZEN_ARCH_X64 1 -# define ZEN_ARCH_ARM64 0 +# define ZEN_ARCH_X64 1 +# define ZEN_ARCH_ARM64 0 #elif defined(__arm64__) || defined(_M_ARM64) -# define ZEN_ARCH_X64 0 -# define ZEN_ARCH_ARM64 1 +# define ZEN_ARCH_X64 0 +# define ZEN_ARCH_ARM64 1 #else # error Unknown architecture #endif @@ -142,13 +141,13 @@ char (&ZenArrayCountHelper(const T (&)[N]))[N + 1]; ////////////////////////////////////////////////////////////////////////// #if ZEN_COMPILER_MSC -# define ZEN_NOINLINE __declspec(noinline) +# define ZEN_NOINLINE __declspec(noinline) #else -# define ZEN_NOINLINE __attribute__((noinline)) +# define ZEN_NOINLINE __attribute__((noinline)) #endif #define ZEN_UNUSED(...) ((void)__VA_ARGS__) -#define ZEN_NOT_IMPLEMENTED(...) ZEN_ASSERT(false) +#define ZEN_NOT_IMPLEMENTED(...) ZEN_ASSERT(false, __VA_ARGS__) #define ZENCORE_API // Placeholder to allow DLL configs in the future ZENCORE_API bool IsPointerToStack(const void* ptr); // Query if pointer is within the stack of the currently executing thread diff --git a/zencore/intmath.cpp b/zencore/intmath.cpp index ae65085b6..98c345c79 100644 --- a/zencore/intmath.cpp +++ b/zencore/intmath.cpp @@ -58,4 +58,4 @@ TEST_CASE("intmath") CHECK(ByteSwap(uint64_t(0x214d'6172'7469'6e21ull)) == 0x216e'6974'7261'4d21ull); } -} // namespace zen +} // namespace zen diff --git a/zencore/memory.cpp b/zencore/memory.cpp index 9c7fb8333..26c8321e5 100644 --- a/zencore/memory.cpp +++ b/zencore/memory.cpp @@ -15,7 +15,8 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// -static void* AlignedAllocImpl(size_t size, size_t alignment) +static void* +AlignedAllocImpl(size_t size, size_t alignment) { #if ZEN_PLATFORM_WINDOWS // return _aligned_malloc(size, alignment); // MSVC alternative @@ -26,7 +27,8 @@ static void* AlignedAllocImpl(size_t size, size_t alignment) #endif } -void AlignedFreeImpl(void* ptr) +void +AlignedFreeImpl(void* ptr) { if (ptr == nullptr) return; diff --git a/zencore/session.cpp b/zencore/session.cpp index 195a9d97c..d57d3685b 100644 --- a/zencore/session.cpp +++ b/zencore/session.cpp @@ -1,3 +1,5 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + #include "zencore/session.h" #include <zencore/uid.h> @@ -6,16 +8,15 @@ namespace zen { -static Oid GlobalSessionId; +static Oid GlobalSessionId; static std::once_flag SessionInitFlag; -Oid GetSessionId() +Oid +GetSessionId() { - std::call_once(SessionInitFlag, [&] { - GlobalSessionId.Generate(); - }); + std::call_once(SessionInitFlag, [&] { GlobalSessionId.Generate(); }); - return GlobalSessionId; + return GlobalSessionId; } -}
\ No newline at end of file +} // namespace zen
\ No newline at end of file diff --git a/zencore/zencore.cpp b/zencore/zencore.cpp index 56bdd2ae8..f9b19ba9d 100644 --- a/zencore/zencore.cpp +++ b/zencore/zencore.cpp @@ -3,11 +3,11 @@ #include <zencore/zencore.h> #if ZEN_PLATFORM_WINDOWS -#include <zencore/windows.h> +# include <zencore/windows.h> #endif #if ZEN_PLATFORM_LINUX -#include <pthread.h> +# include <pthread.h> #endif #include <zencore/blake3.h> @@ -46,7 +46,7 @@ IsPointerToStack(const void* ptr) pthread_attr_t attr; pthread_getattr_np(self, &attr); - void* low; + void* low; size_t size; pthread_attr_getstack(&attr, &low, &size); diff --git a/zenhttp/httpclient.cpp b/zenhttp/httpclient.cpp index 7e3e9d374..fb1df30b2 100644 --- a/zenhttp/httpclient.cpp +++ b/zenhttp/httpclient.cpp @@ -10,8 +10,7 @@ #include <zencore/session.h> #include <zencore/sharedbuffer.h> #include <zencore/stream.h> - -#include "httpshared.h" +#include <zenhttp/httpshared.h> #include <doctest/doctest.h> diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index f4a5f4345..62ee66a08 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -3,7 +3,6 @@ #include <zenhttp/httpserver.h> #include "httpnull.h" -#include "httpshared.h" #include "httpsys.h" #include "httpuws.h" @@ -15,6 +14,7 @@ #include <zencore/stream.h> #include <zencore/string.h> #include <zencore/thread.h> +#include <zenhttp/httpshared.h> #include <conio.h> #include <new.h> diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index 68252a763..2dbf95959 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include "httpshared.h" +#include <zenhttp/httpshared.h> #include <zencore/compactbinarypackage.h> #include <zencore/compositebuffer.h> @@ -58,22 +58,54 @@ FormatPackageMessage(const CbPackage& Data) IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(RootIoBuffer); // Root object - *AttachmentInfo++ = {.AttachmentSize = RootIoBuffer.Size(), .AttachmentHash = Data.GetObjectHash()}; + *AttachmentInfo++ = {.AttachmentSize = RootIoBuffer.Size(), + .Flags = CbAttachmentEntry::kIsObject, + .AttachmentHash = Data.GetObjectHash()}; // Attachment payloads for (const CbAttachment& Attachment : Attachments) { - CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary(); - CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + if (Attachment.IsNull()) + { + ZEN_NOT_IMPLEMENTED("Null attachments are not supported"); + } + else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary()) + { + CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); - *AttachmentInfo++ = {.AttachmentSize = AttachmentBuffer.GetCompressedSize(), - .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + *AttachmentInfo++ = {.AttachmentSize = AttachmentBuffer.GetCompressedSize(), + .Flags = CbAttachmentEntry::kIsCompressed, + .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; - for (const SharedBuffer& Segment : Compressed.GetSegments()) + for (const SharedBuffer& Segment : Compressed.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + TotalAttachmentsSize += Segment.GetSize(); + } + } + else if (CbObject AttachmentObject = Attachment.AsObject()) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); - TotalAttachmentsSize += Segment.GetSize(); + IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer(); + ResponseBuffers.push_back(ObjIoBuffer); + + *AttachmentInfo++ = {.AttachmentSize = ObjIoBuffer.Size(), + .Flags = CbAttachmentEntry::kIsObject, + .AttachmentHash = Attachment.GetHash()}; + } + else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) + { + *AttachmentInfo++ = {.AttachmentSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + + for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + TotalAttachmentsSize += Segment.GetSize(); + } + } + else + { + ZEN_NOT_IMPLEMENTED("Unknown attachment kind"); } } @@ -119,16 +151,45 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize); - CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); - - if (i == 0) + if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { - Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); + CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); + + if (Entry.Flags & CbAttachmentEntry::kIsObject) + { + if (i == 0) + { + Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); + } + else + { + ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported"); + } + } + else + { + CbAttachment Attachment(std::move(CompBuf)); + Package.AddAttachment(Attachment); + } } - else + else /* not compressed */ { - CbAttachment Attachment(std::move(CompBuf)); - Package.AddAttachment(Attachment); + if (Entry.Flags & CbAttachmentEntry::kIsObject) + { + if (i == 0) + { + Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer)); + } + else + { + ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported"); + } + } + else + { + CbAttachment Attachment(SharedBuffer{AttachmentBuffer}); + Package.AddAttachment(Attachment); + } } } diff --git a/zenhttp/httpshared.h b/zenhttp/httpshared.h index 06fdb104f..92c1ef9c6 100644 --- a/zenhttp/httpshared.h +++ b/zenhttp/httpshared.h @@ -28,8 +28,14 @@ static constinit uint32_t kCbPkgMagic = 0xaa77aacc; struct CbAttachmentEntry { uint64_t AttachmentSize; - uint32_t Reserved1; + uint32_t Flags; IoHash AttachmentHash; + + enum + { + kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format + kIsObject = (1u << 1), // Is compact binary object + }; }; static_assert(sizeof(CbAttachmentEntry) == 32); diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index d4c58bffd..d70c88271 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -2,8 +2,6 @@ #include "httpsys.h" -#include "httpshared.h" - #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> @@ -11,6 +9,7 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/string.h> +#include <zenhttp/httpshared.h> #if ZEN_WITH_HTTPSYS diff --git a/zenhttp/include/zenhttp/httpclient.h b/zenhttp/include/zenhttp/httpclient.h index 3e342f2bd..aa36a8027 100644 --- a/zenhttp/include/zenhttp/httpclient.h +++ b/zenhttp/include/zenhttp/httpclient.h @@ -24,8 +24,8 @@ namespace zen { class CbPackage; /** HTTP client implementation for Zen use cases - - Currently simple and synchronous, should become lean and asynchronous + + Currently simple and synchronous, should become lean and asynchronous */ class HttpClient { diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h new file mode 100644 index 000000000..92c1ef9c6 --- /dev/null +++ b/zenhttp/include/zenhttp/httpshared.h @@ -0,0 +1,51 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> + +#include <functional> + +namespace zen { + +class IoBuffer; +class CbPackage; +class CompositeBuffer; + +struct CbPackageHeader +{ + uint32_t HeaderMagic; + uint32_t AttachmentCount; + uint32_t Reserved1; + uint32_t Reserved2; +}; + +static_assert(sizeof(CbPackageHeader) == 16); + +static constinit uint32_t kCbPkgMagic = 0xaa77aacc; + +struct CbAttachmentEntry +{ + uint64_t AttachmentSize; + uint32_t Flags; + IoHash AttachmentHash; + + enum + { + kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format + kIsObject = (1u << 1), // Is compact binary object + }; +}; + +static_assert(sizeof(CbAttachmentEntry) == 32); + +std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data); +CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data); +CbPackage ParsePackageMessage( + IoBuffer Payload, + std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer { + return IoBuffer{Size}; + }); + +} // namespace zen diff --git a/zenhttp/include/zenhttp/zenhttp.h b/zenhttp/include/zenhttp/zenhttp.h index 586fc98b4..165f34b48 100644 --- a/zenhttp/include/zenhttp/zenhttp.h +++ b/zenhttp/include/zenhttp/zenhttp.h @@ -8,6 +8,6 @@ namespace zen { - ZENHTTP_API void zenhttp_forcelinktests(); - +ZENHTTP_API void zenhttp_forcelinktests(); + } diff --git a/zenhttp/zenhttp.cpp b/zenhttp/zenhttp.cpp index 148cf4499..637486f55 100644 --- a/zenhttp/zenhttp.cpp +++ b/zenhttp/zenhttp.cpp @@ -1,3 +1,5 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + #include <zenhttp/zenhttp.h> #include <zenhttp/httpserver.h> @@ -7,7 +9,7 @@ namespace zen { void zenhttp_forcelinktests() { - http_forcelink(); + http_forcelink(); } -}
\ No newline at end of file +} // namespace zen
\ No newline at end of file diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 213648319..1a41a5541 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -11,11 +11,14 @@ #include <zencore/fmtutils.h> #include <zencore/iohash.h> #include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/stream.h> #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/timer.h> -#include <zenhttp/zenhttp.h> #include <zenhttp/httpclient.h> +#include <zenhttp/httpshared.h> +#include <zenhttp/zenhttp.h> #include <zenserverprocess.h> #include <mimalloc.h> @@ -33,6 +36,7 @@ #include <filesystem> #include <map> #include <random> +#include <span> #include <atlbase.h> #include <process.h> @@ -194,6 +198,8 @@ private: size_t rv = http_parser_execute(&m_HttpParser, &m_HttpParserSettings, (const char*)m_ResponseBuffer.data(), Bytes); + ZEN_UNUSED(rv); + if (m_HttpParser.http_errno != 0) { // Something bad! @@ -1088,7 +1094,7 @@ TEST_CASE("project.pipe") } # endif -TEST_CASE("z$.basic") +TEST_CASE("zcache.basic") { using namespace std::literals; @@ -1179,6 +1185,221 @@ TEST_CASE("z$.basic") } } +TEST_CASE("zcache.cbpackage") +{ + using namespace std::literals; + + auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + + OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); + + zen::CbWriter Obj; + Obj.BeginObject("obj"sv); + Obj.AddBinaryAttachment("data", OutAttachmentKey); + Obj.EndObject(); + + zen::CbPackage Package; + Package.SetObject(Obj.Save().AsObject()); + Package.AddAttachment(zen::CbAttachment(CompressedData)); + + return Package; + }; + + auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::MemoryOutStream MemStream; + zen::BinaryWriter Writer(MemStream); + + Package.Save(Writer); + + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { + std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments(); + std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments(); + + if (LhsAttachments.size() != LhsAttachments.size()) + { + return false; + } + + for (const zen::CbAttachment& LhsAttachment : LhsAttachments) + { + const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); + CHECK(RhsAttachment); + + zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); + CHECK(!LhsBuffer.IsNull()); + + zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); + CHECK(!RhsBuffer.IsNull()); + + if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) + { + return false; + } + } + + return true; + }; + + SUBCASE("PUT/GET returns correct package") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); + + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // PUT + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + // GET + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Response); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("PUT propagates upstream") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t LocalPortNumber = 13337; + const uint16_t RemotePortNumber = 13338; + + const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber); + const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(LocalPortNumber, + "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber)); + + LocalInstance.WaitUntilReady(); + RemoteInstance.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in the local instance + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + // The cache record can be retrieved as a package from the local instance + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + + // The cache record can be retrieved as a package from the remote instance + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("GET finds upstream when missing in local") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t LocalPortNumber = 13337; + const uint16_t RemotePortNumber = 13338; + + const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber); + const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(LocalPortNumber, + "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber)); + + LocalInstance.WaitUntilReady(); + RemoteInstance.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in upstream cache + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + // The cache record can be retrieved as a package from the local cache + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } +} + struct RemoteExecutionRequest { RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath) @@ -1482,6 +1703,10 @@ TEST_CASE("http.package") zen::HttpClient TestClient(BaseUri); zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage); + + zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + + CHECK_EQ(ResponsePackage, TestPackage); } #endif diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 9600c5f8a..cf7deaa93 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1,10 +1,12 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> @@ -15,6 +17,8 @@ #include "upstream/zen.h" #include "zenstore/cidstore.h" +#include <zencore/compactbinarypackage.h> + #include <algorithm> #include <atomic> #include <filesystem> @@ -37,7 +41,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InC , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { - // m_Log.set_level(spdlog::level::debug); } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -127,14 +130,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req case kHead: case kGet: { + const ZenContentType AcceptType = Request.AcceptContentType(); + ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; if (!Success && m_UpstreamCache) { - const ZenContentType CacheRecordType = - Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject; + const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary + : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage + : ZenContentType::kCbObject; if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); UpstreamResult.Success) @@ -143,43 +149,85 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Success = true; InUpstreamCache = true; - if (CacheRecordType == ZenContentType::kCbObject) + if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) { - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()), - zen::CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) + if (CacheRecordType == ZenContentType::kCbObject) { - zen::CbObjectView Cbo(UpstreamResult.Value.Data()); + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All); - std::vector<IoHash> References; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + if (ValidationResult == CbValidateError::None) + { + zen::CbObjectView CacheRecord(UpstreamResult.Value.Data()); - if (!References.empty()) + zen::CbObjectWriter IndexData; + IndexData.BeginArray("references"); + CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); + IndexData.EndArray(); + + Value.IndexData = IndexData.Save(); + } + else { - zen::CbObjectWriter Idx; - Idx.BeginArray("references"); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - } - Idx.EndArray(); - - Value.IndexData = Idx.Save(); + Success = false; + ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", + Ref.BucketSegment, + Ref.HashKey); } } - else + + if (Success) { - Value.Value = IoBuffer(); - Success = false; - ZEN_WARN("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); } } - - if (Success) + else { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); + + CbPackage Package; + if (Package.TryLoad(UpstreamResult.Value)) + { + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + CbObject CacheRecord = Package.GetObject(); + + CacheRecord.IterateAttachments( + [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) + { + m_CidStore.AddChunk(Chunk); + FoundCount++; + } + else + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", + Ref.BucketSegment, + Ref.HashKey); + } + } + AttachmentCount++; + }); + + if (FoundCount == AttachmentCount) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + } + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", + Ref.BucketSegment, + Ref.HashKey); + } + } + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); + } } } } @@ -196,14 +244,74 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Request.SetSuppressResponseBody(); } - ZEN_DEBUG("HIT - '{}/{}' ({} bytes {}) ({})", - Ref.BucketSegment, - Ref.HashKey, - Value.Value.Size(), - Value.Value.GetContentType(), - InUpstreamCache ? "upstream" : "local"); + if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) + { + CbObjectView CacheRecord(Value.Value.Data()); + + const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All); + + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); + + return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + } + + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + uint64_t AttachmentBytes = 0ull; + + CbPackage Package; + + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + FoundCount++; + } + AttachmentCount++; + }); + + if (FoundCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + FoundCount, + AttachmentCount); + + return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } + + Package.SetObject(LoadCompactBinaryObject(Value.Value)); + + ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(AttachmentBytes + Value.Value.Size()), + AttachmentCount, + InUpstreamCache ? "UPSTREAM" : "LOCAL"); + + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + + IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + } + else + { + ZEN_DEBUG("HIT - '{}/{}' {} ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Value.Value.Size()), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); + + return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + } } break; @@ -218,28 +326,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req const HttpContentType ContentType = Request.RequestContentType(); - bool IsCompactBinary = false; - - switch (ContentType) - { - case HttpContentType::kUnknownContentType: - case HttpContentType::kBinary: - IsCompactBinary = false; - break; - - case HttpContentType::kCbObject: - IsCompactBinary = true; - break; - - default: - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); - } - - if (!IsCompactBinary) + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - ZEN_DEBUG("PUT (binary) - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Body.Size(), Body.GetContentType()); + ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); if (m_UpstreamCache) { @@ -249,86 +340,193 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req return Request.WriteResponse(zen::HttpResponseCode::Created); } - - // Validate payload before accessing it - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) + else if (ContentType == HttpContentType::kCbObject) { - ZEN_WARN("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); + // Validate payload before accessing it + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - // TODO: add details in response, kText || kCbObject? - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Compact binary validation failed"sv); - } + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", + Ref.BucketSegment, + Ref.HashKey, + Body.Size()); - // Extract referenced payload hashes - zen::CbObjectView Cbo(Body.Data()); + // TODO: add details in response, kText || kCbObject? + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Compact binary validation failed"sv); + } - std::vector<IoHash> References; - std::vector<IoHash> MissingRefs; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + // Extract referenced payload hashes + zen::CbObjectView Cbo(Body.Data()); - ZenCacheValue CacheValue; - CacheValue.Value = Body; + std::vector<IoHash> References; + std::vector<IoHash> MissingRefs; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); - if (!References.empty()) - { - zen::CbObjectWriter Idx; - Idx.BeginArray("references"); + ZenCacheValue CacheValue; + CacheValue.Value = Body; - for (const IoHash& Hash : References) + if (!References.empty()) { - Idx.AddHash(Hash); - if (!m_CidStore.ContainsChunk(Hash)) + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); + + for (const IoHash& Hash : References) { - MissingRefs.push_back(Hash); + Idx.AddHash(Hash); + if (!m_CidStore.ContainsChunk(Hash)) + { + MissingRefs.push_back(Hash); + } } + + Idx.EndArray(); + + CacheValue.IndexData = Idx.Save(); } - Idx.EndArray(); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - CacheValue.IndexData = Idx.Save(); - } + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceBytes(CacheValue.Value.Size()), + MissingRefs.size(), + References.size()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (MissingRefs.empty()) + { + // Only enqueue valid cache records, i.e. all referenced payloads exists + if (m_UpstreamCache) + { + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); + } - ZEN_DEBUG("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))", - Ref.BucketSegment, - Ref.HashKey, - CacheValue.Value.Size(), - CacheValue.Value.GetContentType(), - References.size(), - MissingRefs.size()); + return Request.WriteResponse(zen::HttpResponseCode::Created); + } + else + { + // TODO: Binary attachments? + zen::CbObjectWriter Response; + Response.BeginArray("needs"); + for (const IoHash& MissingRef : MissingRefs) + { + Response.AddHash(MissingRef); + ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); + } + Response.EndArray(); - if (MissingRefs.empty()) + // Return Created | BadRequest? + return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + } + } + else if (ContentType == HttpContentType::kCbPackage) { - // Only enqueue valid cache records, i.e. all referenced payloads exists + CbPackage Package; + + if (!Package.TryLoad(Body)) + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); + } + + CbObject CacheRecord = Package.GetObject(); + + int32_t AttachmentCount = 0; + int32_t NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + bool AttachmentsOk = true; + + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + + std::vector<IoHash> PayloadIds; + PayloadIds.reserve(Attachments.size()); + + CacheRecord.IterateAttachments([this, + &Ref, + &Package, + &AttachmentsOk, + &AttachmentCount, + &TotalAttachmentBytes, + &TotalNewBytes, + &NewAttachmentCount, + &PayloadIds](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + const uint64_t ChunkSize = Chunk.GetCompressed().GetSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + PayloadIds.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + TotalNewBytes += ChunkSize; + ++NewAttachmentCount; + } + + TotalAttachmentBytes += ChunkSize; + AttachmentCount++; + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + AttachmentsOk = false; + } + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + AttachmentsOk = false; + } + }); + + if (!AttachmentsOk) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); + } + + IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); + const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size(); + + ZenCacheValue CacheValue{.Value = CacheRecordChunk}; + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (m_UpstreamCache) { - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); + .PayloadIds = std::move(PayloadIds)}); } + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceBytes(TotalPackageBytes), + NewAttachmentCount, + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + zen::NiceBytes(TotalAttachmentBytes)); + return Request.WriteResponse(zen::HttpResponseCode::Created); } else { - // TODO: Binary attachments? - zen::CbObjectWriter Response; - Response.BeginArray("needs"); - for (const IoHash& MissingRef : MissingRefs) - { - Response.AddHash(MissingRef); - ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); - } - Response.EndArray(); - - // Return Created | BadRequest? - return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + return Request.WriteResponse(zen::HttpResponseCode::BadRequest); } } break; @@ -387,13 +585,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re return Request.WriteResponse(zen::HttpResponseCode::NotFound); } - ZEN_DEBUG("HIT - '{}/{}/{}' ({} bytes, {}) ({})", + ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, - Payload.Size(), + NiceBytes(Payload.Size()), Payload.GetContentType(), - InUpstreamCache ? "upstream" : "local"); + InUpstreamCache ? "UPSTREAM" : "LOCAL"); if (Verb == kHead) { @@ -438,13 +636,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); - ZEN_DEBUG("PUT ({}) - '{}/{}/{}' ({} bytes, {})", - Result.New ? "NEW" : "OLD", + ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, - Body.Size(), - Body.GetContentType()); + NiceBytes(Body.Size()), + Body.GetContentType(), + Result.New ? "NEW" : "OLD"); if (Result.New) { diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index c8c959569..8289fd700 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -74,6 +74,7 @@ private: void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; ZenCacheStore& m_CacheStore; zen::CasStore& m_CasStore; diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 7b76bb80b..3197eaee4 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -588,7 +588,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, TotalAttachmentBytes += CompressedSize; ++AttachmentCount; - const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); + const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); if (InsertResult.New) { @@ -659,16 +659,21 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) { CbObjectView ExecEntry = It.AsObjectView(); - std::string_view Name = ExecEntry["name"sv].AsString(); - const IoHash Hash = ExecEntry["hash"sv].AsHash(); - const uint64_t Size = ExecEntry["size"sv].AsUInt64(); + std::string_view Name = ExecEntry["name"sv].AsString(); + const IoHash ChunkHash = ExecEntry["hash"sv].AsHash(); + const uint64_t Size = ExecEntry["size"sv].AsUInt64(); std::filesystem::path FilePath{SandboxPath / Name}; - IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); if (!DataBuffer) { - throw std::runtime_error("worker CAS chunk '{}' missing"_format(Hash)); + throw std::runtime_error("worker CAS chunk '{}' missing"_format(ChunkHash)); + } + + if (DataBuffer.Size() != Size) + { + throw std::runtime_error("worker CAS chunk '{}' size: {}, action spec expected {}"_format(ChunkHash, DataBuffer.Size(), Size)); } zen::WriteFile(FilePath, DataBuffer); @@ -685,16 +690,21 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) { CbObjectView FileEntry = It.AsObjectView(); - std::string_view Name = FileEntry["name"sv].AsString(); - const IoHash Hash = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); + std::string_view Name = FileEntry["name"sv].AsString(); + const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); std::filesystem::path FilePath{SandboxPath / Name}; - IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); if (!DataBuffer) { - throw std::runtime_error("worker CAS chunk '{}' missing"_format(Hash)); + throw std::runtime_error("worker CAS chunk '{}' missing"_format(ChunkHash)); + } + + if (DataBuffer.Size() != Size) + { + throw std::runtime_error("worker CAS chunk '{}' size: {}, action spec expected {}"_format(ChunkHash, DataBuffer.Size(), Size)); } zen::WriteFile(FilePath, DataBuffer); @@ -796,11 +806,14 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) CbPackage OutputPackage; CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]); + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalRawAttachmentBytes = 0; + Output.IterateAttachments([&](CbFieldView Field) { IoHash Hash = Field.AsHash(); std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; - FileContents ChunkData = zen::ReadFile(SandboxPath / "build.output"); + FileContents ChunkData = zen::ReadFile(OutputPath); if (ChunkData.ErrorCode) { @@ -809,12 +822,27 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) ZEN_ASSERT(OutputData.Data.size() == 1); - CbAttachment Attachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0]))); + CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0])); + + if (!AttachmentBuffer) + { + throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + + CbAttachment Attachment(AttachmentBuffer); OutputPackage.AddAttachment(Attachment); }); OutputPackage.SetObject(Output); + ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", + OutputPackage.GetAttachments().size(), + NiceBytes(TotalAttachmentBytes), + NiceBytes(TotalRawAttachmentBytes)); + return OutputPackage; } diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index 474156a5e..86b262213 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -28,6 +28,7 @@ public: virtual void HandleRequest(HttpServerRequest& Request) override; private: + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; HttpRequestRouter m_Router; CasStore& m_CasStore; diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 0af92da6d..4a5467648 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -163,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -194,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -215,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } std::vector<IoHash> diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 97b222a68..38d30a795 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -4,8 +4,14 @@ #include "jupiter.h" #include "zen.h" +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stream.h> #include <zencore/timer.h> + #include <zenstore/cas.h> #include <zenstore/cidstore.h> @@ -121,7 +127,45 @@ namespace detail { } else { - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; + Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); + + if (Result.Success && Type == ZenContentType::kCbPackage) + { + CbPackage Package; + + const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All); + if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) + { + CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); + + CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + else + { + Result.Success = false; + } + }); + + Package.SetObject(CacheRecord); + } + + if (Result.Success) + { + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + + Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + Result.Bytes = MemStream.Size(); + } + } } return {.Value = Result.Response, @@ -305,37 +349,74 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + if (CacheRecord.Type == ZenContentType::kCbPackage) { - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + zen::CbPackage Package; + Package.SetObject(CbObject(SharedBuffer(RecordValue))); + + for (const IoBuffer& Payload : Payloads) { - Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - CacheRecord.PayloadIds[Idx], - Payloads[Idx]); + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + { + Package.AddAttachment(CbAttachment(AttachmentBuffer)); + } + else + { + return {.Reason = std::string("invalid payload buffer"), .Success = false}; + } } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); - if (!Result.Success) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + PackagePayload, + CacheRecord.Type); } - } - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + TotalBytes = Result.Bytes; + TotalElapsedSeconds = Result.ElapsedSeconds; } + else + { + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + { + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + CacheRecord.PayloadIds[Idx], + Payloads[Idx]); + } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + if (!Result.Success) + { + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; + } + } + + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = + Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + } + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + } return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } @@ -448,7 +529,6 @@ public: , m_CacheStore(CacheStore) , m_CidStore(CidStore) { - ZEN_ASSERT(m_Options.ThreadCount > 0); } virtual ~DefaultUpstreamCache() { Shutdown(); } @@ -509,7 +589,15 @@ public: { if (m_IsRunning.load()) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + if (!m_UpstreamThreads.empty()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + } + else + { + ProcessCacheRecord(std::move(CacheRecord)); + } + return {.Success = true}; } @@ -548,11 +636,19 @@ private: for (auto& Endpoint : m_Endpoints) { - if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - Result.Success) + const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (Result.Success) { m_Stats.Add(*Endpoint, Result); } + else + { + ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index eef92bab4..55ddd310f 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -5,6 +5,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/session.h> #include <zencore/stream.h> #include "cache/structuredcachestore.h" @@ -72,7 +73,7 @@ namespace detail { // Note that currently this just implements an UDP echo service for testing purposes -Mesh::Mesh(asio::io_context& IoContext) : m_IoContext(IoContext) +Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(zen::GetSessionId()) { } @@ -257,10 +258,10 @@ Mesh::IssueReceive() if (SessionId != Oid::Zero && SessionId != m_SessionId) { - const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port()); - const uint32_t Lsn = (++It)->AsUInt32(); + // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port()); + // const uint32_t Lsn = (++It)->AsUInt32(); - ZEN_INFO("received hey from {} ({})", SenderIp, SessionId); + ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId); RwLock::ExclusiveLockScope _(m_SessionsLock); @@ -279,7 +280,7 @@ Mesh::IssueReceive() { Oid SessionId = Field.AsObjectId(); - ZEN_INFO("received bye from {} ({})", SenderIp, SessionId); + ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId); // We could verify that it's sent from a known IP before erasing the // session, if we want to be paranoid @@ -389,7 +390,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + Session.SetHeader(cpr::Header{{"Accept", + Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" + : Type == ZenContentType::kCbObject ? "application/x-ue-cb" + : "application/octet-stream"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -429,14 +433,18 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader( - cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + Session.SetHeader(cpr::Header{{"Content-Type", + Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" + : Type == ZenContentType::kCbObject ? "application/x-ue-cb" + : "application/octet-stream"}}); Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } ZenCacheResult @@ -454,7 +462,9 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 541495818..ff4a551bf 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -4,6 +4,7 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> +#include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/thread.h> #include <zencore/uid.h> @@ -59,6 +60,9 @@ private: static const int kMaxMessageSize = 2048; static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; std::atomic<State> m_State = kInitializing; asio::io_context& m_IoContext; std::unique_ptr<asio::ip::udp::socket> m_UdpSocket; @@ -68,7 +72,7 @@ private: uint16_t m_Port = 0; uint8_t m_MessageBuffer[kMaxMessageSize]; asio::high_resolution_timer m_Timer{m_IoContext}; - Oid m_SessionId{Oid::NewOid()}; + Oid m_SessionId; struct PeerInfo { diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 3b56d8683..53dc41a24 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -157,7 +157,7 @@ public: zen::UpstreamCacheOptions UpstreamOptions; - if (UpstreamConfig.UpstreamThreadCount > 0 && UpstreamConfig.UpstreamThreadCount < 32) + if (UpstreamConfig.UpstreamThreadCount < 32) { UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); } diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index e6c7f98ee..100054a0e 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -28,15 +28,16 @@ struct CidStore::CidState RwLock m_Lock; tsl::robin_map<IoHash, IoHash> m_CidMap; - CasStore::InsertResult AddChunk(CompressedBuffer& ChunkData) + CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData) { - IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); - IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash()); + IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); + IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash); - AddCompressedCid(IoHash::FromBLAKE3(ChunkData.GetRawHash()), CompressedHash); + AddCompressedCid(DecompressedId, CompressedHash); - return Result; + return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New}; } void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) @@ -103,7 +104,7 @@ CidStore::~CidStore() { } -CasStore::InsertResult +CidStore::InsertResult CidStore::AddChunk(CompressedBuffer& ChunkData) { return m_Impl->AddChunk(ChunkData); diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 62d642ad1..76a33c915 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -31,11 +31,18 @@ public: CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir); ~CidStore(); - CasStore::InsertResult AddChunk(CompressedBuffer& ChunkData); - void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed); - IoBuffer FindChunkByCid(const IoHash& DecompressedId); - bool ContainsChunk(const IoHash& DecompressedId); - void Flush(); + struct InsertResult + { + IoHash DecompressedId; + IoHash CompressedHash; + bool New = false; + }; + + InsertResult AddChunk(CompressedBuffer& ChunkData); + void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed); + IoBuffer FindChunkByCid(const IoHash& DecompressedId); + bool ContainsChunk(const IoHash& DecompressedId); + void Flush(); // TODO: add batch filter support diff --git a/zenutil/include/zenserverprocess.h b/zenutil/include/zenserverprocess.h index b81d61c25..7b41c8aba 100644 --- a/zenutil/include/zenserverprocess.h +++ b/zenutil/include/zenserverprocess.h @@ -51,7 +51,7 @@ struct ZenServerInstance m_TestDir = TestDir; } - void SpawnServer(int BasePort = 0); + void SpawnServer(int BasePort = 0, std::string_view AdditionalServerArgs = std::string_view()); void AttachToRunningServer(int BasePort = 0); diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index 00f5d4c0d..7f4be2368 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -367,7 +367,7 @@ ZenServerInstance::Shutdown() } void -ZenServerInstance::SpawnServer(int BasePort) +ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerArgs) { ZEN_ASSERT(!m_Process.IsValid()); // Only spawn once @@ -386,7 +386,7 @@ ZenServerInstance::SpawnServer(int BasePort) zen::ExtendableStringBuilder<32> LogId; LogId << "Zen" << ChildId; - zen::ExtendableWideStringBuilder<128> CommandLine; + zen::ExtendableWideStringBuilder<512> CommandLine; CommandLine << "\""; CommandLine.Append(Executable.c_str()); CommandLine << "\""; @@ -417,6 +417,11 @@ ZenServerInstance::SpawnServer(int BasePort) CommandLine << " --mesh"; } + if (!AdditionalServerArgs.empty()) + { + CommandLine << " " << AdditionalServerArgs; + } + std::filesystem::path CurrentDirectory = std::filesystem::current_path(); ZEN_DEBUG("Spawning server '{}'", LogId); |