aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-09-16 17:08:01 +0200
committerMartin Ridgers <[email protected]>2021-09-16 17:08:01 +0200
commit8da2c13a34fd6394aecaf19490d65a8a84592e3c (patch)
tree702cb3aec8145209fb5d8e39d8bf6d1432dd1a33
parentAnother missing include (diff)
parentCompact binary package caching support (#9) (diff)
downloadzen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.tar.xz
zen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.zip
Merge main into linux-mac
-rw-r--r--zencore-test/zencore-test.cpp2
-rw-r--r--zencore/except.cpp2
-rw-r--r--zencore/include/zencore/atomic.h4
-rw-r--r--zencore/include/zencore/except.h2
-rw-r--r--zencore/include/zencore/intmath.h2
-rw-r--r--zencore/include/zencore/iobuffer.h16
-rw-r--r--zencore/include/zencore/memory.h10
-rw-r--r--zencore/include/zencore/session.h4
-rw-r--r--zencore/include/zencore/zencore.h51
-rw-r--r--zencore/intmath.cpp2
-rw-r--r--zencore/memory.cpp6
-rw-r--r--zencore/session.cpp15
-rw-r--r--zencore/zencore.cpp6
-rw-r--r--zenhttp/httpclient.cpp3
-rw-r--r--zenhttp/httpserver.cpp2
-rw-r--r--zenhttp/httpshared.cpp93
-rw-r--r--zenhttp/httpshared.h8
-rw-r--r--zenhttp/httpsys.cpp3
-rw-r--r--zenhttp/include/zenhttp/httpclient.h4
-rw-r--r--zenhttp/include/zenhttp/httpshared.h51
-rw-r--r--zenhttp/include/zenhttp/zenhttp.h4
-rw-r--r--zenhttp/zenhttp.cpp6
-rw-r--r--zenserver-test/zenserver-test.cpp229
-rw-r--r--zenserver/cache/structuredcache.cpp430
-rw-r--r--zenserver/cache/structuredcache.h1
-rw-r--r--zenserver/compute/apply.cpp54
-rw-r--r--zenserver/compute/apply.h1
-rw-r--r--zenserver/upstream/jupiter.cpp6
-rw-r--r--zenserver/upstream/upstreamcache.cpp148
-rw-r--r--zenserver/upstream/zen.cpp30
-rw-r--r--zenserver/upstream/zen.h6
-rw-r--r--zenserver/zenserver.cpp2
-rw-r--r--zenstore/cidstore.cpp13
-rw-r--r--zenstore/include/zenstore/cidstore.h17
-rw-r--r--zenutil/include/zenserverprocess.h2
-rw-r--r--zenutil/zenserverprocess.cpp9
36 files changed, 971 insertions, 273 deletions
diff --git a/zencore-test/zencore-test.cpp b/zencore-test/zencore-test.cpp
index 1782f1926..559349076 100644
--- a/zencore-test/zencore-test.cpp
+++ b/zencore-test/zencore-test.cpp
@@ -1,8 +1,8 @@
// zencore-test.cpp : Defines the entry point for the console application.
//
-#include <zencore/zencore.h>
#include <zencore/logging.h>
+#include <zencore/zencore.h>
#define DOCTEST_CONFIG_IMPLEMENT
#include <doctest/doctest.h>
diff --git a/zencore/except.cpp b/zencore/except.cpp
index 75d0c8dd1..84e52ab9f 100644
--- a/zencore/except.cpp
+++ b/zencore/except.cpp
@@ -20,7 +20,7 @@ ThrowSystemException([[maybe_unused]] HRESULT hRes, [[maybe_unused]] std::string
}
}
-#endif // ZEN_PLATFORM_WINDOWS
+#endif // ZEN_PLATFORM_WINDOWS
void
ThrowLastError(std::string_view Message)
diff --git a/zencore/include/zencore/atomic.h b/zencore/include/zencore/atomic.h
index 7e261771b..bf549e21d 100644
--- a/zencore/include/zencore/atomic.h
+++ b/zencore/include/zencore/atomic.h
@@ -5,9 +5,9 @@
#include <zencore/zencore.h>
#if ZEN_COMPILER_MSC
-#include <intrin.h>
+# include <intrin.h>
#else
-#include <atomic>
+# include <atomic>
#endif
#include <cinttypes>
diff --git a/zencore/include/zencore/except.h b/zencore/include/zencore/except.h
index 90f7d45db..f0e04a795 100644
--- a/zencore/include/zencore/except.h
+++ b/zencore/include/zencore/except.h
@@ -51,7 +51,7 @@ private:
};
ZENCORE_API void ThrowSystemException(HRESULT hRes, std::string_view Message);
-#endif // ZEN_PLATFORM_WINDOWS
+#endif // ZEN_PLATFORM_WINDOWS
ZENCORE_API void ThrowLastError(std::string_view Message);
diff --git a/zencore/include/zencore/intmath.h b/zencore/include/zencore/intmath.h
index 2fdea22b5..7619e1950 100644
--- a/zencore/include/zencore/intmath.h
+++ b/zencore/include/zencore/intmath.h
@@ -179,6 +179,6 @@ Max(auto x, auto y)
//////////////////////////////////////////////////////////////////////////
-void intmath_forcelink(); // internal
+void intmath_forcelink(); // internal
} // namespace zen
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index f3120983e..298952dd6 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -15,14 +15,14 @@ struct IoBufferExtendedCore;
enum class ZenContentType : uint8_t
{
- kBinary = 0, // Note that since this is zero, this will be the default value in IoBuffer
- kText = 1,
- kJSON = 2,
- kCbObject = 3,
- kCbPackage = 4,
- kYAML = 5,
- kCbPackageOffer = 6,
- kCompressedBinary = 7,
+ kBinary = 0, // Note that since this is zero, this will be the default value in IoBuffer
+ kText = 1,
+ kJSON = 2,
+ kCbObject = 3,
+ kCbPackage = 4,
+ kYAML = 5,
+ kCbPackageOffer = 6,
+ kCompressedBinary = 7,
kUnknownContentType = 8,
kCOUNT
};
diff --git a/zencore/include/zencore/memory.h b/zencore/include/zencore/memory.h
index 9d6339595..3d4db1081 100644
--- a/zencore/include/zencore/memory.h
+++ b/zencore/include/zencore/memory.h
@@ -83,7 +83,7 @@ struct MutableMemoryView
{
}
- inline bool IsEmpty() const { return m_Data == m_DataEnd; }
+ inline bool IsEmpty() const { return m_Data == m_DataEnd; }
void* GetData() const { return m_Data; }
void* GetDataEnd() const { return m_DataEnd; }
size_t GetSize() const { return reinterpret_cast<uint8_t*>(m_DataEnd) - reinterpret_cast<uint8_t*>(m_Data); }
@@ -194,10 +194,10 @@ struct MemoryView
{
}
- inline bool Contains(const MemoryView& Other) const { return (m_Data <= Other.m_Data) && (m_DataEnd >= Other.m_DataEnd); }
- inline bool IsEmpty() const { return m_Data == m_DataEnd; }
- const void* GetData() const { return m_Data; }
- const void* GetDataEnd() const { return m_DataEnd; }
+ inline bool Contains(const MemoryView& Other) const { return (m_Data <= Other.m_Data) && (m_DataEnd >= Other.m_DataEnd); }
+ inline bool IsEmpty() const { return m_Data == m_DataEnd; }
+ const void* GetData() const { return m_Data; }
+ const void* GetDataEnd() const { return m_DataEnd; }
size_t GetSize() const { return reinterpret_cast<const uint8_t*>(m_DataEnd) - reinterpret_cast<const uint8_t*>(m_Data); }
inline bool operator==(const MemoryView& Rhs) const { return m_Data == Rhs.m_Data && m_DataEnd == Rhs.m_DataEnd; }
diff --git a/zencore/include/zencore/session.h b/zencore/include/zencore/session.h
index e66794704..2da41b2c8 100644
--- a/zencore/include/zencore/session.h
+++ b/zencore/include/zencore/session.h
@@ -1,3 +1,5 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
#pragma once
#include <zencore/zencore.h>
@@ -8,4 +10,4 @@ struct Oid;
ZENCORE_API Oid GetSessionId();
-}
+} // namespace zen
diff --git a/zencore/include/zencore/zencore.h b/zencore/include/zencore/zencore.h
index 73446b447..54df7e85e 100644
--- a/zencore/include/zencore/zencore.h
+++ b/zencore/include/zencore/zencore.h
@@ -10,39 +10,39 @@
// Platform
//
-#define ZEN_PLATFORM_WINDOWS 0
-#define ZEN_PLATFORM_LINUX 0
-#define ZEN_PLATFORM_MACOS 0
+#define ZEN_PLATFORM_WINDOWS 0
+#define ZEN_PLATFORM_LINUX 0
+#define ZEN_PLATFORM_MACOS 0
#ifdef _WIN32
-# undef ZEN_PLATFORM_WINDOWS
-# define ZEN_PLATFORM_WINDOWS 1
+# undef ZEN_PLATFORM_WINDOWS
+# define ZEN_PLATFORM_WINDOWS 1
#elif defined(__linux__)
-# undef ZEN_PLATFORM_LINUX
-# define ZEN_PLATFORM_LINUX 1
+# undef ZEN_PLATFORM_LINUX
+# define ZEN_PLATFORM_LINUX 1
#elif defined(__APPLE__)
-# undef ZEN_PLATFORM_MACOS
-# define ZEN_PLATFORM_MACOS 1
+# undef ZEN_PLATFORM_MACOS
+# define ZEN_PLATFORM_MACOS 1
#endif
//////////////////////////////////////////////////////////////////////////
// Compiler
//
-#define ZEN_COMPILER_CLANG 0
-#define ZEN_COMPILER_MSC 0
-#define ZEN_COMPILER_GCC 0
+#define ZEN_COMPILER_CLANG 0
+#define ZEN_COMPILER_MSC 0
+#define ZEN_COMPILER_GCC 0
// Clang can define __GNUC__ and/or _MSC_VER so we check for Clang first
#ifdef __clang__
-# undef ZEN_COMPILER_CLANG
-# define ZEN_COMPILER_CLANG 1
+# undef ZEN_COMPILER_CLANG
+# define ZEN_COMPILER_CLANG 1
#elif defined(_MSC_VER)
-# undef ZEN_COMPILER_MSC
-# define ZEN_COMPILER_MSC 1
+# undef ZEN_COMPILER_MSC
+# define ZEN_COMPILER_MSC 1
#elif defined(__GNUC__)
-# undef ZEN_COMPILER_GCC
-# define ZEN_COMPILER_GCC 1
+# undef ZEN_COMPILER_GCC
+# define ZEN_COMPILER_GCC 1
#else
# error Unknown compiler
#endif
@@ -56,17 +56,16 @@
# endif
#endif
-
//////////////////////////////////////////////////////////////////////////
// Architecture
//
#if defined(__amd64__) || defined(_M_X64)
-# define ZEN_ARCH_X64 1
-# define ZEN_ARCH_ARM64 0
+# define ZEN_ARCH_X64 1
+# define ZEN_ARCH_ARM64 0
#elif defined(__arm64__) || defined(_M_ARM64)
-# define ZEN_ARCH_X64 0
-# define ZEN_ARCH_ARM64 1
+# define ZEN_ARCH_X64 0
+# define ZEN_ARCH_ARM64 1
#else
# error Unknown architecture
#endif
@@ -142,13 +141,13 @@ char (&ZenArrayCountHelper(const T (&)[N]))[N + 1];
//////////////////////////////////////////////////////////////////////////
#if ZEN_COMPILER_MSC
-# define ZEN_NOINLINE __declspec(noinline)
+# define ZEN_NOINLINE __declspec(noinline)
#else
-# define ZEN_NOINLINE __attribute__((noinline))
+# define ZEN_NOINLINE __attribute__((noinline))
#endif
#define ZEN_UNUSED(...) ((void)__VA_ARGS__)
-#define ZEN_NOT_IMPLEMENTED(...) ZEN_ASSERT(false)
+#define ZEN_NOT_IMPLEMENTED(...) ZEN_ASSERT(false, __VA_ARGS__)
#define ZENCORE_API // Placeholder to allow DLL configs in the future
ZENCORE_API bool IsPointerToStack(const void* ptr); // Query if pointer is within the stack of the currently executing thread
diff --git a/zencore/intmath.cpp b/zencore/intmath.cpp
index ae65085b6..98c345c79 100644
--- a/zencore/intmath.cpp
+++ b/zencore/intmath.cpp
@@ -58,4 +58,4 @@ TEST_CASE("intmath")
CHECK(ByteSwap(uint64_t(0x214d'6172'7469'6e21ull)) == 0x216e'6974'7261'4d21ull);
}
-} // namespace zen
+} // namespace zen
diff --git a/zencore/memory.cpp b/zencore/memory.cpp
index 9c7fb8333..26c8321e5 100644
--- a/zencore/memory.cpp
+++ b/zencore/memory.cpp
@@ -15,7 +15,8 @@ namespace zen {
//////////////////////////////////////////////////////////////////////////
-static void* AlignedAllocImpl(size_t size, size_t alignment)
+static void*
+AlignedAllocImpl(size_t size, size_t alignment)
{
#if ZEN_PLATFORM_WINDOWS
// return _aligned_malloc(size, alignment); // MSVC alternative
@@ -26,7 +27,8 @@ static void* AlignedAllocImpl(size_t size, size_t alignment)
#endif
}
-void AlignedFreeImpl(void* ptr)
+void
+AlignedFreeImpl(void* ptr)
{
if (ptr == nullptr)
return;
diff --git a/zencore/session.cpp b/zencore/session.cpp
index 195a9d97c..d57d3685b 100644
--- a/zencore/session.cpp
+++ b/zencore/session.cpp
@@ -1,3 +1,5 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
#include "zencore/session.h"
#include <zencore/uid.h>
@@ -6,16 +8,15 @@
namespace zen {
-static Oid GlobalSessionId;
+static Oid GlobalSessionId;
static std::once_flag SessionInitFlag;
-Oid GetSessionId()
+Oid
+GetSessionId()
{
- std::call_once(SessionInitFlag, [&] {
- GlobalSessionId.Generate();
- });
+ std::call_once(SessionInitFlag, [&] { GlobalSessionId.Generate(); });
- return GlobalSessionId;
+ return GlobalSessionId;
}
-} \ No newline at end of file
+} // namespace zen \ No newline at end of file
diff --git a/zencore/zencore.cpp b/zencore/zencore.cpp
index 56bdd2ae8..f9b19ba9d 100644
--- a/zencore/zencore.cpp
+++ b/zencore/zencore.cpp
@@ -3,11 +3,11 @@
#include <zencore/zencore.h>
#if ZEN_PLATFORM_WINDOWS
-#include <zencore/windows.h>
+# include <zencore/windows.h>
#endif
#if ZEN_PLATFORM_LINUX
-#include <pthread.h>
+# include <pthread.h>
#endif
#include <zencore/blake3.h>
@@ -46,7 +46,7 @@ IsPointerToStack(const void* ptr)
pthread_attr_t attr;
pthread_getattr_np(self, &attr);
- void* low;
+ void* low;
size_t size;
pthread_attr_getstack(&attr, &low, &size);
diff --git a/zenhttp/httpclient.cpp b/zenhttp/httpclient.cpp
index 7e3e9d374..fb1df30b2 100644
--- a/zenhttp/httpclient.cpp
+++ b/zenhttp/httpclient.cpp
@@ -10,8 +10,7 @@
#include <zencore/session.h>
#include <zencore/sharedbuffer.h>
#include <zencore/stream.h>
-
-#include "httpshared.h"
+#include <zenhttp/httpshared.h>
#include <doctest/doctest.h>
diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp
index f4a5f4345..62ee66a08 100644
--- a/zenhttp/httpserver.cpp
+++ b/zenhttp/httpserver.cpp
@@ -3,7 +3,6 @@
#include <zenhttp/httpserver.h>
#include "httpnull.h"
-#include "httpshared.h"
#include "httpsys.h"
#include "httpuws.h"
@@ -15,6 +14,7 @@
#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/thread.h>
+#include <zenhttp/httpshared.h>
#include <conio.h>
#include <new.h>
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index 68252a763..2dbf95959 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "httpshared.h"
+#include <zenhttp/httpshared.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
@@ -58,22 +58,54 @@ FormatPackageMessage(const CbPackage& Data)
IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer();
ResponseBuffers.push_back(RootIoBuffer); // Root object
- *AttachmentInfo++ = {.AttachmentSize = RootIoBuffer.Size(), .AttachmentHash = Data.GetObjectHash()};
+ *AttachmentInfo++ = {.AttachmentSize = RootIoBuffer.Size(),
+ .Flags = CbAttachmentEntry::kIsObject,
+ .AttachmentHash = Data.GetObjectHash()};
// Attachment payloads
for (const CbAttachment& Attachment : Attachments)
{
- CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary();
- CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
+ if (Attachment.IsNull())
+ {
+ ZEN_NOT_IMPLEMENTED("Null attachments are not supported");
+ }
+ else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary())
+ {
+ CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
- *AttachmentInfo++ = {.AttachmentSize = AttachmentBuffer.GetCompressedSize(),
- .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
+ *AttachmentInfo++ = {.AttachmentSize = AttachmentBuffer.GetCompressedSize(),
+ .Flags = CbAttachmentEntry::kIsCompressed,
+ .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
- for (const SharedBuffer& Segment : Compressed.GetSegments())
+ for (const SharedBuffer& Segment : Compressed.GetSegments())
+ {
+ ResponseBuffers.push_back(Segment.AsIoBuffer());
+ TotalAttachmentsSize += Segment.GetSize();
+ }
+ }
+ else if (CbObject AttachmentObject = Attachment.AsObject())
{
- ResponseBuffers.push_back(Segment.AsIoBuffer());
- TotalAttachmentsSize += Segment.GetSize();
+ IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer();
+ ResponseBuffers.push_back(ObjIoBuffer);
+
+ *AttachmentInfo++ = {.AttachmentSize = ObjIoBuffer.Size(),
+ .Flags = CbAttachmentEntry::kIsObject,
+ .AttachmentHash = Attachment.GetHash()};
+ }
+ else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary())
+ {
+ *AttachmentInfo++ = {.AttachmentSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
+
+ for (const SharedBuffer& Segment : AttachmentBinary.GetSegments())
+ {
+ ResponseBuffers.push_back(Segment.AsIoBuffer());
+ TotalAttachmentsSize += Segment.GetSize();
+ }
+ }
+ else
+ {
+ ZEN_NOT_IMPLEMENTED("Unknown attachment kind");
}
}
@@ -119,16 +151,45 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize);
- CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer)));
-
- if (i == 0)
+ if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
{
- Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf)));
+ CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer)));
+
+ if (Entry.Flags & CbAttachmentEntry::kIsObject)
+ {
+ if (i == 0)
+ {
+ Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf)));
+ }
+ else
+ {
+ ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported");
+ }
+ }
+ else
+ {
+ CbAttachment Attachment(std::move(CompBuf));
+ Package.AddAttachment(Attachment);
+ }
}
- else
+ else /* not compressed */
{
- CbAttachment Attachment(std::move(CompBuf));
- Package.AddAttachment(Attachment);
+ if (Entry.Flags & CbAttachmentEntry::kIsObject)
+ {
+ if (i == 0)
+ {
+ Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer));
+ }
+ else
+ {
+ ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported");
+ }
+ }
+ else
+ {
+ CbAttachment Attachment(SharedBuffer{AttachmentBuffer});
+ Package.AddAttachment(Attachment);
+ }
}
}
diff --git a/zenhttp/httpshared.h b/zenhttp/httpshared.h
index 06fdb104f..92c1ef9c6 100644
--- a/zenhttp/httpshared.h
+++ b/zenhttp/httpshared.h
@@ -28,8 +28,14 @@ static constinit uint32_t kCbPkgMagic = 0xaa77aacc;
struct CbAttachmentEntry
{
uint64_t AttachmentSize;
- uint32_t Reserved1;
+ uint32_t Flags;
IoHash AttachmentHash;
+
+ enum
+ {
+ kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format
+ kIsObject = (1u << 1), // Is compact binary object
+ };
};
static_assert(sizeof(CbAttachmentEntry) == 32);
diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp
index d4c58bffd..d70c88271 100644
--- a/zenhttp/httpsys.cpp
+++ b/zenhttp/httpsys.cpp
@@ -2,8 +2,6 @@
#include "httpsys.h"
-#include "httpshared.h"
-
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -11,6 +9,7 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
+#include <zenhttp/httpshared.h>
#if ZEN_WITH_HTTPSYS
diff --git a/zenhttp/include/zenhttp/httpclient.h b/zenhttp/include/zenhttp/httpclient.h
index 3e342f2bd..aa36a8027 100644
--- a/zenhttp/include/zenhttp/httpclient.h
+++ b/zenhttp/include/zenhttp/httpclient.h
@@ -24,8 +24,8 @@ namespace zen {
class CbPackage;
/** HTTP client implementation for Zen use cases
-
- Currently simple and synchronous, should become lean and asynchronous
+
+ Currently simple and synchronous, should become lean and asynchronous
*/
class HttpClient
{
diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h
new file mode 100644
index 000000000..92c1ef9c6
--- /dev/null
+++ b/zenhttp/include/zenhttp/httpshared.h
@@ -0,0 +1,51 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+
+#include <functional>
+
+namespace zen {
+
+class IoBuffer;
+class CbPackage;
+class CompositeBuffer;
+
+struct CbPackageHeader
+{
+ uint32_t HeaderMagic;
+ uint32_t AttachmentCount;
+ uint32_t Reserved1;
+ uint32_t Reserved2;
+};
+
+static_assert(sizeof(CbPackageHeader) == 16);
+
+static constinit uint32_t kCbPkgMagic = 0xaa77aacc;
+
+struct CbAttachmentEntry
+{
+ uint64_t AttachmentSize;
+ uint32_t Flags;
+ IoHash AttachmentHash;
+
+ enum
+ {
+ kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format
+ kIsObject = (1u << 1), // Is compact binary object
+ };
+};
+
+static_assert(sizeof(CbAttachmentEntry) == 32);
+
+std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data);
+CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data);
+CbPackage ParsePackageMessage(
+ IoBuffer Payload,
+ std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer {
+ return IoBuffer{Size};
+ });
+
+} // namespace zen
diff --git a/zenhttp/include/zenhttp/zenhttp.h b/zenhttp/include/zenhttp/zenhttp.h
index 586fc98b4..165f34b48 100644
--- a/zenhttp/include/zenhttp/zenhttp.h
+++ b/zenhttp/include/zenhttp/zenhttp.h
@@ -8,6 +8,6 @@
namespace zen {
- ZENHTTP_API void zenhttp_forcelinktests();
-
+ZENHTTP_API void zenhttp_forcelinktests();
+
}
diff --git a/zenhttp/zenhttp.cpp b/zenhttp/zenhttp.cpp
index 148cf4499..637486f55 100644
--- a/zenhttp/zenhttp.cpp
+++ b/zenhttp/zenhttp.cpp
@@ -1,3 +1,5 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
#include <zenhttp/zenhttp.h>
#include <zenhttp/httpserver.h>
@@ -7,7 +9,7 @@ namespace zen {
void
zenhttp_forcelinktests()
{
- http_forcelink();
+ http_forcelink();
}
-} \ No newline at end of file
+} // namespace zen \ No newline at end of file
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 213648319..1a41a5541 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -11,11 +11,14 @@
#include <zencore/fmtutils.h>
#include <zencore/iohash.h>
#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
-#include <zenhttp/zenhttp.h>
#include <zenhttp/httpclient.h>
+#include <zenhttp/httpshared.h>
+#include <zenhttp/zenhttp.h>
#include <zenserverprocess.h>
#include <mimalloc.h>
@@ -33,6 +36,7 @@
#include <filesystem>
#include <map>
#include <random>
+#include <span>
#include <atlbase.h>
#include <process.h>
@@ -194,6 +198,8 @@ private:
size_t rv = http_parser_execute(&m_HttpParser, &m_HttpParserSettings, (const char*)m_ResponseBuffer.data(), Bytes);
+ ZEN_UNUSED(rv);
+
if (m_HttpParser.http_errno != 0)
{
// Something bad!
@@ -1088,7 +1094,7 @@ TEST_CASE("project.pipe")
}
# endif
-TEST_CASE("z$.basic")
+TEST_CASE("zcache.basic")
{
using namespace std::literals;
@@ -1179,6 +1185,221 @@ TEST_CASE("z$.basic")
}
}
+TEST_CASE("zcache.cbpackage")
+{
+ using namespace std::literals;
+
+ auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
+ auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ auto CompressedData = zen::CompressedBuffer::Compress(Data);
+
+ OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash());
+
+ zen::CbWriter Obj;
+ Obj.BeginObject("obj"sv);
+ Obj.AddBinaryAttachment("data", OutAttachmentKey);
+ Obj.EndObject();
+
+ zen::CbPackage Package;
+ Package.SetObject(Obj.Save().AsObject());
+ Package.AddAttachment(zen::CbAttachment(CompressedData));
+
+ return Package;
+ };
+
+ auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::MemoryOutStream MemStream;
+ zen::BinaryWriter Writer(MemStream);
+
+ Package.Save(Writer);
+
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool {
+ std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments();
+ std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments();
+
+ if (LhsAttachments.size() != LhsAttachments.size())
+ {
+ return false;
+ }
+
+ for (const zen::CbAttachment& LhsAttachment : LhsAttachments)
+ {
+ const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash());
+ CHECK(RhsAttachment);
+
+ zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress();
+ CHECK(!LhsBuffer.IsNull());
+
+ zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress();
+ CHECK(!RhsBuffer.IsNull());
+
+ if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView()))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ };
+
+ SUBCASE("PUT/GET returns correct package")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber);
+
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ Instance1.SpawnServer(PortNumber);
+ Instance1.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // PUT
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ // GET
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Response);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+
+ SUBCASE("PUT propagates upstream")
+ {
+ // Setup local and remote server
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ const uint16_t LocalPortNumber = 13337;
+ const uint16_t RemotePortNumber = 13338;
+
+ const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber);
+ const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber);
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ RemoteInstance.SpawnServer(RemotePortNumber);
+
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetTestDir(LocalDataDir);
+ LocalInstance.SpawnServer(LocalPortNumber,
+ "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber));
+
+ LocalInstance.WaitUntilReady();
+ RemoteInstance.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // Store the cache record package in the local instance
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+
+ // The cache record can be retrieved as a package from the local instance
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+
+ // The cache record can be retrieved as a package from the remote instance
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+
+ SUBCASE("GET finds upstream when missing in local")
+ {
+ // Setup local and remote server
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ const uint16_t LocalPortNumber = 13337;
+ const uint16_t RemotePortNumber = 13338;
+
+ const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber);
+ const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber);
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ RemoteInstance.SpawnServer(RemotePortNumber);
+
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetTestDir(LocalDataDir);
+ LocalInstance.SpawnServer(LocalPortNumber,
+ "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber));
+
+ LocalInstance.WaitUntilReady();
+ RemoteInstance.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // Store the cache record package in upstream cache
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+
+ // The cache record can be retrieved as a package from the local cache
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+}
+
struct RemoteExecutionRequest
{
RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath)
@@ -1482,6 +1703,10 @@ TEST_CASE("http.package")
zen::HttpClient TestClient(BaseUri);
zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage);
+
+ zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
+
+ CHECK_EQ(ResponsePackage, TestPackage);
}
#endif
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 9600c5f8a..cf7deaa93 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1,10 +1,12 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
@@ -15,6 +17,8 @@
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
+#include <zencore/compactbinarypackage.h>
+
#include <algorithm>
#include <atomic>
#include <filesystem>
@@ -37,7 +41,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InC
, m_CidStore(InCidStore)
, m_UpstreamCache(std::move(UpstreamCache))
{
- // m_Log.set_level(spdlog::level::debug);
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -127,14 +130,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
case kHead:
case kGet:
{
+ const ZenContentType AcceptType = Request.AcceptContentType();
+
ZenCacheValue Value;
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
bool InUpstreamCache = false;
if (!Success && m_UpstreamCache)
{
- const ZenContentType CacheRecordType =
- Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject;
+ const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
+ : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
+ : ZenContentType::kCbObject;
if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
UpstreamResult.Success)
@@ -143,43 +149,85 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Success = true;
InUpstreamCache = true;
- if (CacheRecordType == ZenContentType::kCbObject)
+ if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
{
- const zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()),
- zen::CbValidateMode::All);
-
- if (ValidationResult == CbValidateError::None)
+ if (CacheRecordType == ZenContentType::kCbObject)
{
- zen::CbObjectView Cbo(UpstreamResult.Value.Data());
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All);
- std::vector<IoHash> References;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ if (ValidationResult == CbValidateError::None)
+ {
+ zen::CbObjectView CacheRecord(UpstreamResult.Value.Data());
- if (!References.empty())
+ zen::CbObjectWriter IndexData;
+ IndexData.BeginArray("references");
+ CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
+ IndexData.EndArray();
+
+ Value.IndexData = IndexData.Save();
+ }
+ else
{
- zen::CbObjectWriter Idx;
- Idx.BeginArray("references");
- for (const IoHash& Hash : References)
- {
- Idx.AddHash(Hash);
- }
- Idx.EndArray();
-
- Value.IndexData = Idx.Save();
+ Success = false;
+ ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream",
+ Ref.BucketSegment,
+ Ref.HashKey);
}
}
- else
+
+ if (Success)
{
- Value.Value = IoBuffer();
- Success = false;
- ZEN_WARN("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
}
}
-
- if (Success)
+ else
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
+
+ CbPackage Package;
+ if (Package.TryLoad(UpstreamResult.Value))
+ {
+ uint32_t AttachmentCount = 0;
+ uint32_t FoundCount = 0;
+ CbObject CacheRecord = Package.GetObject();
+
+ CacheRecord.IterateAttachments(
+ [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
+ {
+ m_CidStore.AddChunk(Chunk);
+ FoundCount++;
+ }
+ else
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
+ }
+ AttachmentCount++;
+ });
+
+ if (FoundCount == AttachmentCount)
+ {
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey);
+ }
}
}
}
@@ -196,14 +244,74 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Request.SetSuppressResponseBody();
}
- ZEN_DEBUG("HIT - '{}/{}' ({} bytes {}) ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- Value.Value.Size(),
- Value.Value.GetContentType(),
- InUpstreamCache ? "upstream" : "local");
+ if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
+ {
+ CbObjectView CacheRecord(Value.Value.Data());
+
+ const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All);
+
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey);
+
+ return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
+ }
+
+ uint32_t AttachmentCount = 0;
+ uint32_t FoundCount = 0;
+ uint64_t AttachmentBytes = 0ull;
+
+ CbPackage Package;
+
+ CacheRecord.IterateAttachments(
+ [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ AttachmentBytes += Chunk.Size();
+ FoundCount++;
+ }
+ AttachmentCount++;
+ });
+
+ if (FoundCount != AttachmentCount)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ FoundCount,
+ AttachmentCount);
+
+ return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ }
+
+ Package.SetObject(LoadCompactBinaryObject(Value.Value));
+
+ ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(AttachmentBytes + Value.Value.Size()),
+ AttachmentCount,
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
+
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
+ }
+ else
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Value.Value.Size()),
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
+
+ return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ }
}
break;
@@ -218,28 +326,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
const HttpContentType ContentType = Request.RequestContentType();
- bool IsCompactBinary = false;
-
- switch (ContentType)
- {
- case HttpContentType::kUnknownContentType:
- case HttpContentType::kBinary:
- IsCompactBinary = false;
- break;
-
- case HttpContentType::kCbObject:
- IsCompactBinary = true;
- break;
-
- default:
- return Request.WriteResponse(zen::HttpResponseCode::BadRequest);
- }
-
- if (!IsCompactBinary)
+ if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
{
// TODO: create a cache record and put value in CAS?
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- ZEN_DEBUG("PUT (binary) - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Body.Size(), Body.GetContentType());
+ ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
if (m_UpstreamCache)
{
@@ -249,86 +340,193 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
return Request.WriteResponse(zen::HttpResponseCode::Created);
}
-
- // Validate payload before accessing it
- const zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
-
- if (ValidationResult != CbValidateError::None)
+ else if (ContentType == HttpContentType::kCbObject)
{
- ZEN_WARN("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
+ // Validate payload before accessing it
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
- // TODO: add details in response, kText || kCbObject?
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Compact binary validation failed"sv);
- }
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Body.Size());
- // Extract referenced payload hashes
- zen::CbObjectView Cbo(Body.Data());
+ // TODO: add details in response, kText || kCbObject?
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Compact binary validation failed"sv);
+ }
- std::vector<IoHash> References;
- std::vector<IoHash> MissingRefs;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ // Extract referenced payload hashes
+ zen::CbObjectView Cbo(Body.Data());
- ZenCacheValue CacheValue;
- CacheValue.Value = Body;
+ std::vector<IoHash> References;
+ std::vector<IoHash> MissingRefs;
+ Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
- if (!References.empty())
- {
- zen::CbObjectWriter Idx;
- Idx.BeginArray("references");
+ ZenCacheValue CacheValue;
+ CacheValue.Value = Body;
- for (const IoHash& Hash : References)
+ if (!References.empty())
{
- Idx.AddHash(Hash);
- if (!m_CidStore.ContainsChunk(Hash))
+ zen::CbObjectWriter Idx;
+ Idx.BeginArray("references");
+
+ for (const IoHash& Hash : References)
{
- MissingRefs.push_back(Hash);
+ Idx.AddHash(Hash);
+ if (!m_CidStore.ContainsChunk(Hash))
+ {
+ MissingRefs.push_back(Hash);
+ }
}
+
+ Idx.EndArray();
+
+ CacheValue.IndexData = Idx.Save();
}
- Idx.EndArray();
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
- CacheValue.IndexData = Idx.Save();
- }
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceBytes(CacheValue.Value.Size()),
+ MissingRefs.size(),
+ References.size());
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ if (MissingRefs.empty())
+ {
+ // Only enqueue valid cache records, i.e. all referenced payloads exists
+ if (m_UpstreamCache)
+ {
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(References)});
+ }
- ZEN_DEBUG("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))",
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue.Value.Size(),
- CacheValue.Value.GetContentType(),
- References.size(),
- MissingRefs.size());
+ return Request.WriteResponse(zen::HttpResponseCode::Created);
+ }
+ else
+ {
+ // TODO: Binary attachments?
+ zen::CbObjectWriter Response;
+ Response.BeginArray("needs");
+ for (const IoHash& MissingRef : MissingRefs)
+ {
+ Response.AddHash(MissingRef);
+ ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
+ }
+ Response.EndArray();
- if (MissingRefs.empty())
+ // Return Created | BadRequest?
+ return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save());
+ }
+ }
+ else if (ContentType == HttpContentType::kCbPackage)
{
- // Only enqueue valid cache records, i.e. all referenced payloads exists
+ CbPackage Package;
+
+ if (!Package.TryLoad(Body))
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey);
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
+ }
+
+ CbObject CacheRecord = Package.GetObject();
+
+ int32_t AttachmentCount = 0;
+ int32_t NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+ bool AttachmentsOk = true;
+
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+
+ std::vector<IoHash> PayloadIds;
+ PayloadIds.reserve(Attachments.size());
+
+ CacheRecord.IterateAttachments([this,
+ &Ref,
+ &Package,
+ &AttachmentsOk,
+ &AttachmentCount,
+ &TotalAttachmentBytes,
+ &TotalNewBytes,
+ &NewAttachmentCount,
+ &PayloadIds](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ const uint64_t ChunkSize = Chunk.GetCompressed().GetSize();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+
+ PayloadIds.emplace_back(InsertResult.DecompressedId);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += ChunkSize;
+ ++NewAttachmentCount;
+ }
+
+ TotalAttachmentBytes += ChunkSize;
+ AttachmentCount++;
+ }
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
+ AttachmentsOk = false;
+ }
+ }
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
+ AttachmentsOk = false;
+ }
+ });
+
+ if (!AttachmentsOk)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments");
+ }
+
+ IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer();
+ const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size();
+
+ ZenCacheValue CacheValue{.Value = CacheRecordChunk};
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+
if (m_UpstreamCache)
{
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(References)});
+ .PayloadIds = std::move(PayloadIds)});
}
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceBytes(TotalPackageBytes),
+ NewAttachmentCount,
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ zen::NiceBytes(TotalAttachmentBytes));
+
return Request.WriteResponse(zen::HttpResponseCode::Created);
}
else
{
- // TODO: Binary attachments?
- zen::CbObjectWriter Response;
- Response.BeginArray("needs");
- for (const IoHash& MissingRef : MissingRefs)
- {
- Response.AddHash(MissingRef);
- ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
- }
- Response.EndArray();
-
- // Return Created | BadRequest?
- return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save());
+ return Request.WriteResponse(zen::HttpResponseCode::BadRequest);
}
}
break;
@@ -387,13 +585,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
return Request.WriteResponse(zen::HttpResponseCode::NotFound);
}
- ZEN_DEBUG("HIT - '{}/{}/{}' ({} bytes, {}) ({})",
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})",
Ref.BucketSegment,
Ref.HashKey,
Ref.PayloadId,
- Payload.Size(),
+ NiceBytes(Payload.Size()),
Payload.GetContentType(),
- InUpstreamCache ? "upstream" : "local");
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
if (Verb == kHead)
{
@@ -438,13 +636,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
- ZEN_DEBUG("PUT ({}) - '{}/{}/{}' ({} bytes, {})",
- Result.New ? "NEW" : "OLD",
+ ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}",
Ref.BucketSegment,
Ref.HashKey,
Ref.PayloadId,
- Body.Size(),
- Body.GetContentType());
+ NiceBytes(Body.Size()),
+ Body.GetContentType(),
+ Result.New ? "NEW" : "OLD");
if (Result.New)
{
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index c8c959569..8289fd700 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -74,6 +74,7 @@ private:
void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
+ spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
ZenCacheStore& m_CacheStore;
zen::CasStore& m_CasStore;
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 7b76bb80b..3197eaee4 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -588,7 +588,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
TotalAttachmentBytes += CompressedSize;
++AttachmentCount;
- const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
+ const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
if (InsertResult.New)
{
@@ -659,16 +659,21 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
{
CbObjectView ExecEntry = It.AsObjectView();
- std::string_view Name = ExecEntry["name"sv].AsString();
- const IoHash Hash = ExecEntry["hash"sv].AsHash();
- const uint64_t Size = ExecEntry["size"sv].AsUInt64();
+ std::string_view Name = ExecEntry["name"sv].AsString();
+ const IoHash ChunkHash = ExecEntry["hash"sv].AsHash();
+ const uint64_t Size = ExecEntry["size"sv].AsUInt64();
std::filesystem::path FilePath{SandboxPath / Name};
- IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+ IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash);
if (!DataBuffer)
{
- throw std::runtime_error("worker CAS chunk '{}' missing"_format(Hash));
+ throw std::runtime_error("worker CAS chunk '{}' missing"_format(ChunkHash));
+ }
+
+ if (DataBuffer.Size() != Size)
+ {
+ throw std::runtime_error("worker CAS chunk '{}' size: {}, action spec expected {}"_format(ChunkHash, DataBuffer.Size(), Size));
}
zen::WriteFile(FilePath, DataBuffer);
@@ -685,16 +690,21 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
{
CbObjectView FileEntry = It.AsObjectView();
- std::string_view Name = FileEntry["name"sv].AsString();
- const IoHash Hash = FileEntry["hash"sv].AsHash();
- const uint64_t Size = FileEntry["size"sv].AsUInt64();
+ std::string_view Name = FileEntry["name"sv].AsString();
+ const IoHash ChunkHash = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
std::filesystem::path FilePath{SandboxPath / Name};
- IoBuffer DataBuffer = m_CasStore.FindChunk(Hash);
+ IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash);
if (!DataBuffer)
{
- throw std::runtime_error("worker CAS chunk '{}' missing"_format(Hash));
+ throw std::runtime_error("worker CAS chunk '{}' missing"_format(ChunkHash));
+ }
+
+ if (DataBuffer.Size() != Size)
+ {
+ throw std::runtime_error("worker CAS chunk '{}' size: {}, action spec expected {}"_format(ChunkHash, DataBuffer.Size(), Size));
}
zen::WriteFile(FilePath, DataBuffer);
@@ -796,11 +806,14 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
CbPackage OutputPackage;
CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]);
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalRawAttachmentBytes = 0;
+
Output.IterateAttachments([&](CbFieldView Field) {
IoHash Hash = Field.AsHash();
std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()};
- FileContents ChunkData = zen::ReadFile(SandboxPath / "build.output");
+ FileContents ChunkData = zen::ReadFile(OutputPath);
if (ChunkData.ErrorCode)
{
@@ -809,12 +822,27 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
ZEN_ASSERT(OutputData.Data.size() == 1);
- CbAttachment Attachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0])));
+ CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0]));
+
+ if (!AttachmentBuffer)
+ {
+ throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)");
+ }
+
+ TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ TotalRawAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+
+ CbAttachment Attachment(AttachmentBuffer);
OutputPackage.AddAttachment(Attachment);
});
OutputPackage.SetObject(Output);
+ ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)",
+ OutputPackage.GetAttachments().size(),
+ NiceBytes(TotalAttachmentBytes),
+ NiceBytes(TotalRawAttachmentBytes));
+
return OutputPackage;
}
diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h
index 474156a5e..86b262213 100644
--- a/zenserver/compute/apply.h
+++ b/zenserver/compute/apply.h
@@ -28,6 +28,7 @@ public:
virtual void HandleRequest(HttpServerRequest& Request) override;
private:
+ spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
HttpRequestRouter m_Router;
CasStore& m_CasStore;
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 0af92da6d..4a5467648 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -163,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -194,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -215,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
std::vector<IoHash>
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 97b222a68..38d30a795 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -4,8 +4,14 @@
#include "jupiter.h"
#include "zen.h"
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
+
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
@@ -121,7 +127,45 @@ namespace detail {
}
else
{
- Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type);
+ const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type;
+ Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType);
+
+ if (Result.Success && Type == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+
+ const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ if (Result.Success = ValidationResult == CbValidateError::None; Result.Success)
+ {
+ CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
+
+ CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
+ else
+ {
+ Result.Success = false;
+ }
+ });
+
+ Package.SetObject(CacheRecord);
+ }
+
+ if (Result.Success)
+ {
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ Result.Bytes = MemStream.Size();
+ }
+ }
}
return {.Value = Result.Response,
@@ -305,37 +349,74 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ if (CacheRecord.Type == ZenContentType::kCbPackage)
{
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ zen::CbPackage Package;
+ Package.SetObject(CbObject(SharedBuffer(RecordValue)));
+
+ for (const IoBuffer& Payload : Payloads)
{
- Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- CacheRecord.PayloadIds[Idx],
- Payloads[Idx]);
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ {
+ Package.AddAttachment(CbAttachment(AttachmentBuffer));
+ }
+ else
+ {
+ return {.Reason = std::string("invalid payload buffer"), .Success = false};
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+ IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size());
- if (!Result.Success)
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ PackagePayload,
+ CacheRecord.Type);
}
- }
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ TotalBytes = Result.Bytes;
+ TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else
+ {
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ {
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ CacheRecord.PayloadIds[Idx],
+ Payloads[Idx]);
+ }
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ if (!Result.Success)
+ {
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
+ }
+ }
+
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result =
+ Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ }
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
@@ -448,7 +529,6 @@ public:
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
{
- ZEN_ASSERT(m_Options.ThreadCount > 0);
}
virtual ~DefaultUpstreamCache() { Shutdown(); }
@@ -509,7 +589,15 @@ public:
{
if (m_IsRunning.load())
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ if (!m_UpstreamThreads.empty())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ }
+ else
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+
return {.Success = true};
}
@@ -548,11 +636,19 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- Result.Success)
+ const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (Result.Success)
{
m_Stats.Add(*Endpoint, Result);
}
+ else
+ {
+ ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index eef92bab4..55ddd310f 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -5,6 +5,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/session.h>
#include <zencore/stream.h>
#include "cache/structuredcachestore.h"
@@ -72,7 +73,7 @@ namespace detail {
// Note that currently this just implements an UDP echo service for testing purposes
-Mesh::Mesh(asio::io_context& IoContext) : m_IoContext(IoContext)
+Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(zen::GetSessionId())
{
}
@@ -257,10 +258,10 @@ Mesh::IssueReceive()
if (SessionId != Oid::Zero && SessionId != m_SessionId)
{
- const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port());
- const uint32_t Lsn = (++It)->AsUInt32();
+ // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port());
+ // const uint32_t Lsn = (++It)->AsUInt32();
- ZEN_INFO("received hey from {} ({})", SenderIp, SessionId);
+ ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId);
RwLock::ExclusiveLockScope _(m_SessionsLock);
@@ -279,7 +280,7 @@ Mesh::IssueReceive()
{
Oid SessionId = Field.AsObjectId();
- ZEN_INFO("received bye from {} ({})", SenderIp, SessionId);
+ ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId);
// We could verify that it's sent from a known IP before erasing the
// session, if we want to be paranoid
@@ -389,7 +390,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetHeader(cpr::Header{{"Accept",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -429,14 +433,18 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(
- cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetHeader(cpr::Header{{"Content-Type",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
ZenCacheResult
@@ -454,7 +462,9 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 541495818..ff4a551bf 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -4,6 +4,7 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
+#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
@@ -59,6 +60,9 @@ private:
static const int kMaxMessageSize = 2048;
static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
std::atomic<State> m_State = kInitializing;
asio::io_context& m_IoContext;
std::unique_ptr<asio::ip::udp::socket> m_UdpSocket;
@@ -68,7 +72,7 @@ private:
uint16_t m_Port = 0;
uint8_t m_MessageBuffer[kMaxMessageSize];
asio::high_resolution_timer m_Timer{m_IoContext};
- Oid m_SessionId{Oid::NewOid()};
+ Oid m_SessionId;
struct PeerInfo
{
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 3b56d8683..53dc41a24 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -157,7 +157,7 @@ public:
zen::UpstreamCacheOptions UpstreamOptions;
- if (UpstreamConfig.UpstreamThreadCount > 0 && UpstreamConfig.UpstreamThreadCount < 32)
+ if (UpstreamConfig.UpstreamThreadCount < 32)
{
UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
}
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index e6c7f98ee..100054a0e 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -28,15 +28,16 @@ struct CidStore::CidState
RwLock m_Lock;
tsl::robin_map<IoHash, IoHash> m_CidMap;
- CasStore::InsertResult AddChunk(CompressedBuffer& ChunkData)
+ CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData)
{
- IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer();
- IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+ const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash());
+ IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer();
+ IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash);
- AddCompressedCid(IoHash::FromBLAKE3(ChunkData.GetRawHash()), CompressedHash);
+ AddCompressedCid(DecompressedId, CompressedHash);
- return Result;
+ return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New};
}
void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
@@ -103,7 +104,7 @@ CidStore::~CidStore()
{
}
-CasStore::InsertResult
+CidStore::InsertResult
CidStore::AddChunk(CompressedBuffer& ChunkData)
{
return m_Impl->AddChunk(ChunkData);
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index 62d642ad1..76a33c915 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -31,11 +31,18 @@ public:
CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir);
~CidStore();
- CasStore::InsertResult AddChunk(CompressedBuffer& ChunkData);
- void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed);
- IoBuffer FindChunkByCid(const IoHash& DecompressedId);
- bool ContainsChunk(const IoHash& DecompressedId);
- void Flush();
+ struct InsertResult
+ {
+ IoHash DecompressedId;
+ IoHash CompressedHash;
+ bool New = false;
+ };
+
+ InsertResult AddChunk(CompressedBuffer& ChunkData);
+ void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId);
+ bool ContainsChunk(const IoHash& DecompressedId);
+ void Flush();
// TODO: add batch filter support
diff --git a/zenutil/include/zenserverprocess.h b/zenutil/include/zenserverprocess.h
index b81d61c25..7b41c8aba 100644
--- a/zenutil/include/zenserverprocess.h
+++ b/zenutil/include/zenserverprocess.h
@@ -51,7 +51,7 @@ struct ZenServerInstance
m_TestDir = TestDir;
}
- void SpawnServer(int BasePort = 0);
+ void SpawnServer(int BasePort = 0, std::string_view AdditionalServerArgs = std::string_view());
void AttachToRunningServer(int BasePort = 0);
diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp
index 00f5d4c0d..7f4be2368 100644
--- a/zenutil/zenserverprocess.cpp
+++ b/zenutil/zenserverprocess.cpp
@@ -367,7 +367,7 @@ ZenServerInstance::Shutdown()
}
void
-ZenServerInstance::SpawnServer(int BasePort)
+ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerArgs)
{
ZEN_ASSERT(!m_Process.IsValid()); // Only spawn once
@@ -386,7 +386,7 @@ ZenServerInstance::SpawnServer(int BasePort)
zen::ExtendableStringBuilder<32> LogId;
LogId << "Zen" << ChildId;
- zen::ExtendableWideStringBuilder<128> CommandLine;
+ zen::ExtendableWideStringBuilder<512> CommandLine;
CommandLine << "\"";
CommandLine.Append(Executable.c_str());
CommandLine << "\"";
@@ -417,6 +417,11 @@ ZenServerInstance::SpawnServer(int BasePort)
CommandLine << " --mesh";
}
+ if (!AdditionalServerArgs.empty())
+ {
+ CommandLine << " " << AdditionalServerArgs;
+ }
+
std::filesystem::path CurrentDirectory = std::filesystem::current_path();
ZEN_DEBUG("Spawning server '{}'", LogId);