aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--zenserver-test/zenserver-test.cpp220
-rw-r--r--zenserver/cache/structuredcache.cpp430
-rw-r--r--zenserver/compute/apply.cpp2
-rw-r--r--zenserver/upstream/jupiter.cpp6
-rw-r--r--zenserver/upstream/upstreamcache.cpp148
-rw-r--r--zenserver/upstream/zen.cpp19
-rw-r--r--zenserver/zenserver.cpp2
-rw-r--r--zenstore/cidstore.cpp13
-rw-r--r--zenstore/include/zenstore/cidstore.h17
-rw-r--r--zenutil/include/zenserverprocess.h2
-rw-r--r--zenutil/zenserverprocess.cpp9
11 files changed, 701 insertions, 167 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 973ef874a..8a634107d 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -11,6 +11,8 @@
#include <zencore/fmtutils.h>
#include <zencore/iohash.h>
#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
@@ -34,6 +36,7 @@
#include <filesystem>
#include <map>
#include <random>
+#include <span>
#include <atlbase.h>
#include <process.h>
@@ -1091,7 +1094,7 @@ TEST_CASE("project.pipe")
}
# endif
-TEST_CASE("z$.basic")
+TEST_CASE("zcache.basic")
{
using namespace std::literals;
@@ -1182,6 +1185,221 @@ TEST_CASE("z$.basic")
}
}
+TEST_CASE("zcache.cbpackage")
+{
+ using namespace std::literals;
+
+ auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
+ auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ auto CompressedData = zen::CompressedBuffer::Compress(Data);
+
+ OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash());
+
+ zen::CbWriter Obj;
+ Obj.BeginObject("obj"sv);
+ Obj.AddBinaryAttachment("data", OutAttachmentKey);
+ Obj.EndObject();
+
+ zen::CbPackage Package;
+ Package.SetObject(Obj.Save().AsObject());
+ Package.AddAttachment(zen::CbAttachment(CompressedData));
+
+ return Package;
+ };
+
+ auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::MemoryOutStream MemStream;
+ zen::BinaryWriter Writer(MemStream);
+
+ Package.Save(Writer);
+
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool {
+ std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments();
+ std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments();
+
+ if (LhsAttachments.size() != LhsAttachments.size())
+ {
+ return false;
+ }
+
+ for (const zen::CbAttachment& LhsAttachment : LhsAttachments)
+ {
+ const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash());
+ CHECK(RhsAttachment);
+
+ zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress();
+ CHECK(!LhsBuffer.IsNull());
+
+ zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress();
+ CHECK(!RhsBuffer.IsNull());
+
+ if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView()))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ };
+
+ SUBCASE("PUT/GET returns correct package")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber);
+
+ ZenServerInstance Instance1(TestEnv);
+ Instance1.SetTestDir(TestDir);
+ Instance1.SpawnServer(PortNumber);
+ Instance1.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // PUT
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ // GET
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Response);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+
+ SUBCASE("PUT propagates upstream")
+ {
+ // Setup local and remote server
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ const uint16_t LocalPortNumber = 13337;
+ const uint16_t RemotePortNumber = 13338;
+
+ const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber);
+ const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber);
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ RemoteInstance.SpawnServer(RemotePortNumber);
+
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetTestDir(LocalDataDir);
+ LocalInstance.SpawnServer(LocalPortNumber,
+ "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber));
+
+ LocalInstance.WaitUntilReady();
+ RemoteInstance.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // Store the cache record package in the local instance
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+
+ // The cache record can be retrieved as a package from the local instance
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+
+ // The cache record can be retrieved as a package from the remote instance
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+
+ SUBCASE("GET finds upstream when missing in local")
+ {
+ // Setup local and remote server
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ const uint16_t LocalPortNumber = 13337;
+ const uint16_t RemotePortNumber = 13338;
+
+ const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber);
+ const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber);
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ RemoteInstance.SpawnServer(RemotePortNumber);
+
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetTestDir(LocalDataDir);
+ LocalInstance.SpawnServer(LocalPortNumber,
+ "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber));
+
+ LocalInstance.WaitUntilReady();
+ RemoteInstance.WaitUntilReady();
+
+ const std::string_view Bucket = "mosdef"sv;
+ zen::IoHash Key;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+
+ // Store the cache record package in upstream cache
+ {
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Body.Data(), Body.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+
+ // The cache record can be retrieved as a package from the local cache
+ {
+ cpr::Response Result =
+ cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
+ }
+ }
+}
+
struct RemoteExecutionRequest
{
RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath)
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 9600c5f8a..cf7deaa93 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1,10 +1,12 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
@@ -15,6 +17,8 @@
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
+#include <zencore/compactbinarypackage.h>
+
#include <algorithm>
#include <atomic>
#include <filesystem>
@@ -37,7 +41,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InC
, m_CidStore(InCidStore)
, m_UpstreamCache(std::move(UpstreamCache))
{
- // m_Log.set_level(spdlog::level::debug);
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -127,14 +130,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
case kHead:
case kGet:
{
+ const ZenContentType AcceptType = Request.AcceptContentType();
+
ZenCacheValue Value;
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
bool InUpstreamCache = false;
if (!Success && m_UpstreamCache)
{
- const ZenContentType CacheRecordType =
- Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject;
+ const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
+ : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
+ : ZenContentType::kCbObject;
if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
UpstreamResult.Success)
@@ -143,43 +149,85 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Success = true;
InUpstreamCache = true;
- if (CacheRecordType == ZenContentType::kCbObject)
+ if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
{
- const zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()),
- zen::CbValidateMode::All);
-
- if (ValidationResult == CbValidateError::None)
+ if (CacheRecordType == ZenContentType::kCbObject)
{
- zen::CbObjectView Cbo(UpstreamResult.Value.Data());
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All);
- std::vector<IoHash> References;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ if (ValidationResult == CbValidateError::None)
+ {
+ zen::CbObjectView CacheRecord(UpstreamResult.Value.Data());
- if (!References.empty())
+ zen::CbObjectWriter IndexData;
+ IndexData.BeginArray("references");
+ CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
+ IndexData.EndArray();
+
+ Value.IndexData = IndexData.Save();
+ }
+ else
{
- zen::CbObjectWriter Idx;
- Idx.BeginArray("references");
- for (const IoHash& Hash : References)
- {
- Idx.AddHash(Hash);
- }
- Idx.EndArray();
-
- Value.IndexData = Idx.Save();
+ Success = false;
+ ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream",
+ Ref.BucketSegment,
+ Ref.HashKey);
}
}
- else
+
+ if (Success)
{
- Value.Value = IoBuffer();
- Success = false;
- ZEN_WARN("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
}
}
-
- if (Success)
+ else
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
+
+ CbPackage Package;
+ if (Package.TryLoad(UpstreamResult.Value))
+ {
+ uint32_t AttachmentCount = 0;
+ uint32_t FoundCount = 0;
+ CbObject CacheRecord = Package.GetObject();
+
+ CacheRecord.IterateAttachments(
+ [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
+ {
+ m_CidStore.AddChunk(Chunk);
+ FoundCount++;
+ }
+ else
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
+ }
+ AttachmentCount++;
+ });
+
+ if (FoundCount == AttachmentCount)
+ {
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey);
+ }
}
}
}
@@ -196,14 +244,74 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Request.SetSuppressResponseBody();
}
- ZEN_DEBUG("HIT - '{}/{}' ({} bytes {}) ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- Value.Value.Size(),
- Value.Value.GetContentType(),
- InUpstreamCache ? "upstream" : "local");
+ if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
+ {
+ CbObjectView CacheRecord(Value.Value.Data());
+
+ const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All);
+
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey);
+
+ return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
+ }
+
+ uint32_t AttachmentCount = 0;
+ uint32_t FoundCount = 0;
+ uint64_t AttachmentBytes = 0ull;
+
+ CbPackage Package;
+
+ CacheRecord.IterateAttachments(
+ [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ AttachmentBytes += Chunk.Size();
+ FoundCount++;
+ }
+ AttachmentCount++;
+ });
+
+ if (FoundCount != AttachmentCount)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ FoundCount,
+ AttachmentCount);
+
+ return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ }
+
+ Package.SetObject(LoadCompactBinaryObject(Value.Value));
+
+ ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(AttachmentBytes + Value.Value.Size()),
+ AttachmentCount,
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
+
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
+ }
+ else
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Value.Value.Size()),
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
+
+ return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ }
}
break;
@@ -218,28 +326,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
const HttpContentType ContentType = Request.RequestContentType();
- bool IsCompactBinary = false;
-
- switch (ContentType)
- {
- case HttpContentType::kUnknownContentType:
- case HttpContentType::kBinary:
- IsCompactBinary = false;
- break;
-
- case HttpContentType::kCbObject:
- IsCompactBinary = true;
- break;
-
- default:
- return Request.WriteResponse(zen::HttpResponseCode::BadRequest);
- }
-
- if (!IsCompactBinary)
+ if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
{
// TODO: create a cache record and put value in CAS?
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- ZEN_DEBUG("PUT (binary) - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Body.Size(), Body.GetContentType());
+ ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
if (m_UpstreamCache)
{
@@ -249,86 +340,193 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
return Request.WriteResponse(zen::HttpResponseCode::Created);
}
-
- // Validate payload before accessing it
- const zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
-
- if (ValidationResult != CbValidateError::None)
+ else if (ContentType == HttpContentType::kCbObject)
{
- ZEN_WARN("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
+ // Validate payload before accessing it
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
- // TODO: add details in response, kText || kCbObject?
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Compact binary validation failed"sv);
- }
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Body.Size());
- // Extract referenced payload hashes
- zen::CbObjectView Cbo(Body.Data());
+ // TODO: add details in response, kText || kCbObject?
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Compact binary validation failed"sv);
+ }
- std::vector<IoHash> References;
- std::vector<IoHash> MissingRefs;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ // Extract referenced payload hashes
+ zen::CbObjectView Cbo(Body.Data());
- ZenCacheValue CacheValue;
- CacheValue.Value = Body;
+ std::vector<IoHash> References;
+ std::vector<IoHash> MissingRefs;
+ Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
- if (!References.empty())
- {
- zen::CbObjectWriter Idx;
- Idx.BeginArray("references");
+ ZenCacheValue CacheValue;
+ CacheValue.Value = Body;
- for (const IoHash& Hash : References)
+ if (!References.empty())
{
- Idx.AddHash(Hash);
- if (!m_CidStore.ContainsChunk(Hash))
+ zen::CbObjectWriter Idx;
+ Idx.BeginArray("references");
+
+ for (const IoHash& Hash : References)
{
- MissingRefs.push_back(Hash);
+ Idx.AddHash(Hash);
+ if (!m_CidStore.ContainsChunk(Hash))
+ {
+ MissingRefs.push_back(Hash);
+ }
}
+
+ Idx.EndArray();
+
+ CacheValue.IndexData = Idx.Save();
}
- Idx.EndArray();
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
- CacheValue.IndexData = Idx.Save();
- }
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceBytes(CacheValue.Value.Size()),
+ MissingRefs.size(),
+ References.size());
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ if (MissingRefs.empty())
+ {
+ // Only enqueue valid cache records, i.e. all referenced payloads exists
+ if (m_UpstreamCache)
+ {
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(References)});
+ }
- ZEN_DEBUG("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))",
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue.Value.Size(),
- CacheValue.Value.GetContentType(),
- References.size(),
- MissingRefs.size());
+ return Request.WriteResponse(zen::HttpResponseCode::Created);
+ }
+ else
+ {
+ // TODO: Binary attachments?
+ zen::CbObjectWriter Response;
+ Response.BeginArray("needs");
+ for (const IoHash& MissingRef : MissingRefs)
+ {
+ Response.AddHash(MissingRef);
+ ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
+ }
+ Response.EndArray();
- if (MissingRefs.empty())
+ // Return Created | BadRequest?
+ return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save());
+ }
+ }
+ else if (ContentType == HttpContentType::kCbPackage)
{
- // Only enqueue valid cache records, i.e. all referenced payloads exists
+ CbPackage Package;
+
+ if (!Package.TryLoad(Body))
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey);
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
+ }
+
+ CbObject CacheRecord = Package.GetObject();
+
+ int32_t AttachmentCount = 0;
+ int32_t NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+ bool AttachmentsOk = true;
+
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+
+ std::vector<IoHash> PayloadIds;
+ PayloadIds.reserve(Attachments.size());
+
+ CacheRecord.IterateAttachments([this,
+ &Ref,
+ &Package,
+ &AttachmentsOk,
+ &AttachmentCount,
+ &TotalAttachmentBytes,
+ &TotalNewBytes,
+ &NewAttachmentCount,
+ &PayloadIds](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ const uint64_t ChunkSize = Chunk.GetCompressed().GetSize();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+
+ PayloadIds.emplace_back(InsertResult.DecompressedId);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += ChunkSize;
+ ++NewAttachmentCount;
+ }
+
+ TotalAttachmentBytes += ChunkSize;
+ AttachmentCount++;
+ }
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
+ AttachmentsOk = false;
+ }
+ }
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
+ AttachmentsOk = false;
+ }
+ });
+
+ if (!AttachmentsOk)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments");
+ }
+
+ IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer();
+ const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size();
+
+ ZenCacheValue CacheValue{.Value = CacheRecordChunk};
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+
if (m_UpstreamCache)
{
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(References)});
+ .PayloadIds = std::move(PayloadIds)});
}
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ zen::NiceBytes(TotalPackageBytes),
+ NewAttachmentCount,
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ zen::NiceBytes(TotalAttachmentBytes));
+
return Request.WriteResponse(zen::HttpResponseCode::Created);
}
else
{
- // TODO: Binary attachments?
- zen::CbObjectWriter Response;
- Response.BeginArray("needs");
- for (const IoHash& MissingRef : MissingRefs)
- {
- Response.AddHash(MissingRef);
- ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
- }
- Response.EndArray();
-
- // Return Created | BadRequest?
- return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save());
+ return Request.WriteResponse(zen::HttpResponseCode::BadRequest);
}
}
break;
@@ -387,13 +585,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
return Request.WriteResponse(zen::HttpResponseCode::NotFound);
}
- ZEN_DEBUG("HIT - '{}/{}/{}' ({} bytes, {}) ({})",
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})",
Ref.BucketSegment,
Ref.HashKey,
Ref.PayloadId,
- Payload.Size(),
+ NiceBytes(Payload.Size()),
Payload.GetContentType(),
- InUpstreamCache ? "upstream" : "local");
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
if (Verb == kHead)
{
@@ -438,13 +636,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
- ZEN_DEBUG("PUT ({}) - '{}/{}/{}' ({} bytes, {})",
- Result.New ? "NEW" : "OLD",
+ ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}",
Ref.BucketSegment,
Ref.HashKey,
Ref.PayloadId,
- Body.Size(),
- Body.GetContentType());
+ NiceBytes(Body.Size()),
+ Body.GetContentType(),
+ Result.New ? "NEW" : "OLD");
if (Result.New)
{
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index e40c6918d..3197eaee4 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -588,7 +588,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
TotalAttachmentBytes += CompressedSize;
++AttachmentCount;
- const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
+ const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView);
if (InsertResult.New)
{
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 0af92da6d..4a5467648 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -163,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -194,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -215,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
std::vector<IoHash>
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 97b222a68..38d30a795 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -4,8 +4,14 @@
#include "jupiter.h"
#include "zen.h"
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
+
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
@@ -121,7 +127,45 @@ namespace detail {
}
else
{
- Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type);
+ const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type;
+ Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType);
+
+ if (Result.Success && Type == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+
+ const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ if (Result.Success = ValidationResult == CbValidateError::None; Result.Success)
+ {
+ CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
+
+ CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
+ else
+ {
+ Result.Success = false;
+ }
+ });
+
+ Package.SetObject(CacheRecord);
+ }
+
+ if (Result.Success)
+ {
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ Result.Bytes = MemStream.Size();
+ }
+ }
}
return {.Value = Result.Response,
@@ -305,37 +349,74 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ if (CacheRecord.Type == ZenContentType::kCbPackage)
{
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ zen::CbPackage Package;
+ Package.SetObject(CbObject(SharedBuffer(RecordValue)));
+
+ for (const IoBuffer& Payload : Payloads)
{
- Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- CacheRecord.PayloadIds[Idx],
- Payloads[Idx]);
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ {
+ Package.AddAttachment(CbAttachment(AttachmentBuffer));
+ }
+ else
+ {
+ return {.Reason = std::string("invalid payload buffer"), .Success = false};
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+ IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size());
- if (!Result.Success)
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ PackagePayload,
+ CacheRecord.Type);
}
- }
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ TotalBytes = Result.Bytes;
+ TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else
+ {
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ {
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ CacheRecord.PayloadIds[Idx],
+ Payloads[Idx]);
+ }
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ if (!Result.Success)
+ {
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
+ }
+ }
+
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result =
+ Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ }
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
@@ -448,7 +529,6 @@ public:
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
{
- ZEN_ASSERT(m_Options.ThreadCount > 0);
}
virtual ~DefaultUpstreamCache() { Shutdown(); }
@@ -509,7 +589,15 @@ public:
{
if (m_IsRunning.load())
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ if (!m_UpstreamThreads.empty())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ }
+ else
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+
return {.Success = true};
}
@@ -548,11 +636,19 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- Result.Success)
+ const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (Result.Success)
{
m_Stats.Add(*Endpoint, Result);
}
+ else
+ {
+ ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index b553558e7..55ddd310f 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -390,7 +390,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetHeader(cpr::Header{{"Accept",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -430,14 +433,18 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(
- cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetHeader(cpr::Header{{"Content-Type",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
ZenCacheResult
@@ -455,7 +462,9 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
} // namespace zen
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index aa4a42fd7..f1960ab36 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -166,7 +166,7 @@ public:
zen::UpstreamCacheOptions UpstreamOptions;
- if (UpstreamConfig.UpstreamThreadCount > 0 && UpstreamConfig.UpstreamThreadCount < 32)
+ if (UpstreamConfig.UpstreamThreadCount < 32)
{
UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
}
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index e6c7f98ee..100054a0e 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -28,15 +28,16 @@ struct CidStore::CidState
RwLock m_Lock;
tsl::robin_map<IoHash, IoHash> m_CidMap;
- CasStore::InsertResult AddChunk(CompressedBuffer& ChunkData)
+ CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData)
{
- IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer();
- IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+ const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash());
+ IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer();
+ IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash);
- AddCompressedCid(IoHash::FromBLAKE3(ChunkData.GetRawHash()), CompressedHash);
+ AddCompressedCid(DecompressedId, CompressedHash);
- return Result;
+ return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New};
}
void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
@@ -103,7 +104,7 @@ CidStore::~CidStore()
{
}
-CasStore::InsertResult
+CidStore::InsertResult
CidStore::AddChunk(CompressedBuffer& ChunkData)
{
return m_Impl->AddChunk(ChunkData);
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index 62d642ad1..76a33c915 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -31,11 +31,18 @@ public:
CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir);
~CidStore();
- CasStore::InsertResult AddChunk(CompressedBuffer& ChunkData);
- void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed);
- IoBuffer FindChunkByCid(const IoHash& DecompressedId);
- bool ContainsChunk(const IoHash& DecompressedId);
- void Flush();
+ struct InsertResult
+ {
+ IoHash DecompressedId;
+ IoHash CompressedHash;
+ bool New = false;
+ };
+
+ InsertResult AddChunk(CompressedBuffer& ChunkData);
+ void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId);
+ bool ContainsChunk(const IoHash& DecompressedId);
+ void Flush();
// TODO: add batch filter support
diff --git a/zenutil/include/zenserverprocess.h b/zenutil/include/zenserverprocess.h
index 7fcacf788..b659f6e58 100644
--- a/zenutil/include/zenserverprocess.h
+++ b/zenutil/include/zenserverprocess.h
@@ -55,7 +55,7 @@ struct ZenServerInstance
m_TestDir = TestDir;
}
- void SpawnServer(int BasePort = 0);
+ void SpawnServer(int BasePort = 0, std::string_view AdditionalServerArgs = std::string_view());
void AttachToRunningServer(int BasePort = 0);
diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp
index 5142c6a54..2f2b3bd33 100644
--- a/zenutil/zenserverprocess.cpp
+++ b/zenutil/zenserverprocess.cpp
@@ -395,7 +395,7 @@ ZenServerInstance::Shutdown()
}
void
-ZenServerInstance::SpawnServer(int BasePort)
+ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerArgs)
{
ZEN_ASSERT(!m_Process.IsValid()); // Only spawn once
@@ -414,7 +414,7 @@ ZenServerInstance::SpawnServer(int BasePort)
zen::ExtendableStringBuilder<32> LogId;
LogId << "Zen" << ChildId;
- zen::ExtendableWideStringBuilder<128> CommandLine;
+ zen::ExtendableWideStringBuilder<512> CommandLine;
CommandLine << "\"";
CommandLine.Append(Executable.c_str());
CommandLine << "\"";
@@ -455,6 +455,11 @@ ZenServerInstance::SpawnServer(int BasePort)
CommandLine << " --mesh";
}
+ if (!AdditionalServerArgs.empty())
+ {
+ CommandLine << " " << AdditionalServerArgs;
+ }
+
std::filesystem::path CurrentDirectory = std::filesystem::current_path();
ZEN_DEBUG("Spawning server '{}'", LogId);