aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-09-16 17:08:01 +0200
committerMartin Ridgers <[email protected]>2021-09-16 17:08:01 +0200
commit8da2c13a34fd6394aecaf19490d65a8a84592e3c (patch)
tree702cb3aec8145209fb5d8e39d8bf6d1432dd1a33 /zenserver/upstream/upstreamcache.cpp
parentAnother missing include (diff)
parentCompact binary package caching support (#9) (diff)
downloadzen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.tar.xz
zen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.zip
Merge main into linux-mac
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp148
1 files changed, 122 insertions, 26 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 97b222a68..38d30a795 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -4,8 +4,14 @@
#include "jupiter.h"
#include "zen.h"
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
+
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
@@ -121,7 +127,45 @@ namespace detail {
}
else
{
- Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type);
+ const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type;
+ Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType);
+
+ if (Result.Success && Type == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+
+ const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ if (Result.Success = ValidationResult == CbValidateError::None; Result.Success)
+ {
+ CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
+
+ CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
+ else
+ {
+ Result.Success = false;
+ }
+ });
+
+ Package.SetObject(CacheRecord);
+ }
+
+ if (Result.Success)
+ {
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+
+ Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ Result.Bytes = MemStream.Size();
+ }
+ }
}
return {.Value = Result.Response,
@@ -305,37 +349,74 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ if (CacheRecord.Type == ZenContentType::kCbPackage)
{
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ zen::CbPackage Package;
+ Package.SetObject(CbObject(SharedBuffer(RecordValue)));
+
+ for (const IoBuffer& Payload : Payloads)
{
- Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- CacheRecord.PayloadIds[Idx],
- Payloads[Idx]);
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ {
+ Package.AddAttachment(CbAttachment(AttachmentBuffer));
+ }
+ else
+ {
+ return {.Reason = std::string("invalid payload buffer"), .Success = false};
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
+ IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size());
- if (!Result.Success)
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ PackagePayload,
+ CacheRecord.Type);
}
- }
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ TotalBytes = Result.Bytes;
+ TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else
+ {
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ {
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ CacheRecord.PayloadIds[Idx],
+ Payloads[Idx]);
+ }
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ if (!Result.Success)
+ {
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
+ }
+ }
+
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result =
+ Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+ }
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
@@ -448,7 +529,6 @@ public:
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
{
- ZEN_ASSERT(m_Options.ThreadCount > 0);
}
virtual ~DefaultUpstreamCache() { Shutdown(); }
@@ -509,7 +589,15 @@ public:
{
if (m_IsRunning.load())
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ if (!m_UpstreamThreads.empty())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ }
+ else
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+
return {.Success = true};
}
@@ -548,11 +636,19 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- Result.Success)
+ const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (Result.Success)
{
m_Stats.Add(*Endpoint, Result);
}
+ else
+ {
+ ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}