aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-09-16 17:08:01 +0200
committerMartin Ridgers <[email protected]>2021-09-16 17:08:01 +0200
commit8da2c13a34fd6394aecaf19490d65a8a84592e3c (patch)
tree702cb3aec8145209fb5d8e39d8bf6d1432dd1a33 /zenserver/upstream
parentAnother missing include (diff)
parentCompact binary package caching support (#9) (diff)
downloadzen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.tar.xz
zen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.zip
Merge main into linux-mac
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/jupiter.cpp6
-rw-r--r--zenserver/upstream/upstreamcache.cpp148
-rw-r--r--zenserver/upstream/zen.cpp30
-rw-r--r--zenserver/upstream/zen.h6
4 files changed, 150 insertions, 40 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 0af92da6d..4a5467648 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -163,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -194,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -215,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
std::vector<IoHash>
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 97b222a68..38d30a795 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -4,8 +4,14 @@
#include "jupiter.h"
#include "zen.h"
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
+
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
@@ -121,7 +127,45 @@ namespace detail {
}
else
{
- Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type);
+ const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type;
+ Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType);
+
+ if (Result.Success && Type == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+
+ const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ if (Result.Success = ValidationResult == CbValidateError::None; Result.Success)
+ {
+ CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
+
+ CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
+ else
+ {
+ Result.Success = false;
+ }
+ });
+
+ Package.SetObject(CacheRecord);
+ }
+
+ if (Result.Success)
+ {
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ Result.Bytes = MemStream.Size();
+ }
+ }
}
return {.Value = Result.Response,
@@ -305,37 +349,74 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ if (CacheRecord.Type == ZenContentType::kCbPackage)
{
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ zen::CbPackage Package;
+ Package.SetObject(CbObject(SharedBuffer(RecordValue)));
+
+ for (const IoBuffer& Payload : Payloads)
{
- Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- CacheRecord.PayloadIds[Idx],
- Payloads[Idx]);
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ {
+ Package.AddAttachment(CbAttachment(AttachmentBuffer));
+ }
+ else
+ {
+ return {.Reason = std::string("invalid payload buffer"), .Success = false};
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+ IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size());
- if (!Result.Success)
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ PackagePayload,
+ CacheRecord.Type);
}
- }
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ TotalBytes = Result.Bytes;
+ TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else
+ {
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ {
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ CacheRecord.PayloadIds[Idx],
+ Payloads[Idx]);
+ }
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ if (!Result.Success)
+ {
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
+ }
+ }
+
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result =
+ Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ }
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
@@ -448,7 +529,6 @@ public:
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
{
- ZEN_ASSERT(m_Options.ThreadCount > 0);
}
virtual ~DefaultUpstreamCache() { Shutdown(); }
@@ -509,7 +589,15 @@ public:
{
if (m_IsRunning.load())
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ if (!m_UpstreamThreads.empty())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ }
+ else
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+
return {.Success = true};
}
@@ -548,11 +636,19 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- Result.Success)
+ const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (Result.Success)
{
m_Stats.Add(*Endpoint, Result);
}
+ else
+ {
+ ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index eef92bab4..55ddd310f 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -5,6 +5,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/session.h>
#include <zencore/stream.h>
#include "cache/structuredcachestore.h"
@@ -72,7 +73,7 @@ namespace detail {
// Note that currently this just implements an UDP echo service for testing purposes
-Mesh::Mesh(asio::io_context& IoContext) : m_IoContext(IoContext)
+Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(zen::GetSessionId())
{
}
@@ -257,10 +258,10 @@ Mesh::IssueReceive()
if (SessionId != Oid::Zero && SessionId != m_SessionId)
{
- const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port());
- const uint32_t Lsn = (++It)->AsUInt32();
+ // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port());
+ // const uint32_t Lsn = (++It)->AsUInt32();
- ZEN_INFO("received hey from {} ({})", SenderIp, SessionId);
+ ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId);
RwLock::ExclusiveLockScope _(m_SessionsLock);
@@ -279,7 +280,7 @@ Mesh::IssueReceive()
{
Oid SessionId = Field.AsObjectId();
- ZEN_INFO("received bye from {} ({})", SenderIp, SessionId);
+ ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId);
// We could verify that it's sent from a known IP before erasing the
// session, if we want to be paranoid
@@ -389,7 +390,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetHeader(cpr::Header{{"Accept",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -429,14 +433,18 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(
- cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetHeader(cpr::Header{{"Content-Type",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
ZenCacheResult
@@ -454,7 +462,9 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 541495818..ff4a551bf 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -4,6 +4,7 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
+#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
@@ -59,6 +60,9 @@ private:
static const int kMaxMessageSize = 2048;
static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
std::atomic<State> m_State = kInitializing;
asio::io_context& m_IoContext;
std::unique_ptr<asio::ip::udp::socket> m_UdpSocket;
@@ -68,7 +72,7 @@ private:
uint16_t m_Port = 0;
uint8_t m_MessageBuffer[kMaxMessageSize];
asio::high_resolution_timer m_Timer{m_IoContext};
- Oid m_SessionId{Oid::NewOid()};
+ Oid m_SessionId;
struct PeerInfo
{