diff options
| author | Martin Ridgers <[email protected]> | 2021-09-16 17:08:01 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-09-16 17:08:01 +0200 |
| commit | 8da2c13a34fd6394aecaf19490d65a8a84592e3c (patch) | |
| tree | 702cb3aec8145209fb5d8e39d8bf6d1432dd1a33 /zenserver/upstream/upstreamcache.cpp | |
| parent | Another missing include (diff) | |
| parent | Compact binary package caching support (#9) (diff) | |
| download | zen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.tar.xz zen-8da2c13a34fd6394aecaf19490d65a8a84592e3c.zip | |
Merge main into linux-mac
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 148 |
1 files changed, 122 insertions, 26 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 97b222a68..38d30a795 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -4,8 +4,14 @@ #include "jupiter.h" #include "zen.h" +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stream.h> #include <zencore/timer.h> + #include <zenstore/cas.h> #include <zenstore/cidstore.h> @@ -121,7 +127,45 @@ namespace detail { } else { - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; + Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); + + if (Result.Success && Type == ZenContentType::kCbPackage) + { + CbPackage Package; + + const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All); + if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) + { + CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); + + CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + else + { + Result.Success = false; + } + }); + + Package.SetObject(CacheRecord); + } + + if (Result.Success) + { + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + + Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + Result.Bytes = MemStream.Size(); + } + } } return {.Value = Result.Response, @@ -305,37 +349,74 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + if (CacheRecord.Type == ZenContentType::kCbPackage) { - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + zen::CbPackage Package; + Package.SetObject(CbObject(SharedBuffer(RecordValue))); + + for (const IoBuffer& Payload : Payloads) { - Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - CacheRecord.PayloadIds[Idx], - Payloads[Idx]); + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + { + Package.AddAttachment(CbAttachment(AttachmentBuffer)); + } + else + { + return {.Reason = std::string("invalid payload buffer"), .Success = false}; + } } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); - if (!Result.Success) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + PackagePayload, + CacheRecord.Type); } - } - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + TotalBytes = Result.Bytes; + TotalElapsedSeconds = Result.ElapsedSeconds; } + else + { + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + { + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + CacheRecord.PayloadIds[Idx], + Payloads[Idx]); + } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + if (!Result.Success) + { + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; + } + } + + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = + Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + } + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + } return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } @@ -448,7 +529,6 @@ public: , m_CacheStore(CacheStore) , m_CidStore(CidStore) { - ZEN_ASSERT(m_Options.ThreadCount > 0); } virtual ~DefaultUpstreamCache() { Shutdown(); } @@ -509,7 +589,15 @@ public: { if (m_IsRunning.load()) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + if (!m_UpstreamThreads.empty()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + } + else + { + ProcessCacheRecord(std::move(CacheRecord)); + } + return {.Success = true}; } @@ -548,11 +636,19 @@ private: for (auto& Endpoint : m_Endpoints) { - if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - Result.Success) + const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (Result.Success) { m_Stats.Add(*Endpoint, Result); } + else + { + ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } |