aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-09-16 16:21:16 +0200
committerGitHub <[email protected]>2021-09-16 16:21:16 +0200
commitb166d4081655c4c181ed915ec5475ed535c67a9d (patch)
tree2a7d6dd512dc3cfb106442f656bdc65a2650dcac /zenserver/upstream
parentclang-format fixes (diff)
downloadzen-b166d4081655c4c181ed915ec5475ed535c67a9d.tar.xz
zen-b166d4081655c4c181ed915ec5475ed535c67a9d.zip
Compact binary package caching support (#9)
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/jupiter.cpp6
-rw-r--r--zenserver/upstream/upstreamcache.cpp148
-rw-r--r--zenserver/upstream/zen.cpp19
3 files changed, 139 insertions, 34 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 0af92da6d..4a5467648 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -163,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -194,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -215,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
std::vector<IoHash>
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 97b222a68..38d30a795 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -4,8 +4,14 @@
#include "jupiter.h"
#include "zen.h"
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
+
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
@@ -121,7 +127,45 @@ namespace detail {
}
else
{
- Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type);
+ const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type;
+ Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType);
+
+ if (Result.Success && Type == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+
+ const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ if (Result.Success = ValidationResult == CbValidateError::None; Result.Success)
+ {
+ CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
+
+ CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
+ else
+ {
+ Result.Success = false;
+ }
+ });
+
+ Package.SetObject(CacheRecord);
+ }
+
+ if (Result.Success)
+ {
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ Result.Bytes = MemStream.Size();
+ }
+ }
}
return {.Value = Result.Response,
@@ -305,37 +349,74 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ if (CacheRecord.Type == ZenContentType::kCbPackage)
{
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ zen::CbPackage Package;
+ Package.SetObject(CbObject(SharedBuffer(RecordValue)));
+
+ for (const IoBuffer& Payload : Payloads)
{
- Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- CacheRecord.PayloadIds[Idx],
- Payloads[Idx]);
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ {
+ Package.AddAttachment(CbAttachment(AttachmentBuffer));
+ }
+ else
+ {
+ return {.Reason = std::string("invalid payload buffer"), .Success = false};
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+ IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size());
- if (!Result.Success)
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ PackagePayload,
+ CacheRecord.Type);
}
- }
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ TotalBytes = Result.Bytes;
+ TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else
+ {
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ {
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ CacheRecord.PayloadIds[Idx],
+ Payloads[Idx]);
+ }
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ if (!Result.Success)
+ {
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
+ }
+ }
+
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result =
+ Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ }
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
@@ -448,7 +529,6 @@ public:
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
{
- ZEN_ASSERT(m_Options.ThreadCount > 0);
}
virtual ~DefaultUpstreamCache() { Shutdown(); }
@@ -509,7 +589,15 @@ public:
{
if (m_IsRunning.load())
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ if (!m_UpstreamThreads.empty())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ }
+ else
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+
return {.Success = true};
}
@@ -548,11 +636,19 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- Result.Success)
+ const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (Result.Success)
{
m_Stats.Add(*Endpoint, Result);
}
+ else
+ {
+ ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index b553558e7..55ddd310f 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -390,7 +390,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetHeader(cpr::Header{{"Accept",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -430,14 +433,18 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(
- cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetHeader(cpr::Header{{"Content-Type",
+ Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
+ : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
+ : "application/octet-stream"}});
Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
ZenCacheResult
@@ -455,7 +462,9 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
} // namespace zen