diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-27 10:10:44 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-27 10:10:44 +0200 |
| commit | d821c51769cb5f7d433a9cc242d508553d5bd81d (patch) | |
| tree | a33c00df515631a03a6fab6550780f295ff0009d | |
| parent | Merge remote-tracking branch 'origin/main' into de/use-bulk-fetch-from-upstre... (diff) | |
| download | zen-de/optionally-keep-value-in-memory-when-uploadint-to-remote.tar.xz zen-de/optionally-keep-value-in-memory-when-uploadint-to-remote.zip | |
If we don't have much to do in the upstream store queue, pass the data to store in memoryde/optionally-keep-value-in-memory-when-uploadint-to-remote
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 17 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 33 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 4 |
3 files changed, 36 insertions, 18 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 8daf08bff..9123dd780 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -490,7 +490,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}}); + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}}, {.Value = Body}); } Request.WriteResponse(HttpResponseCode::Created); @@ -536,7 +536,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + .ValueContentIds = std::move(ValidAttachments)}, + {.Value = Body}); } Request.WriteResponse(HttpResponseCode::Created); @@ -619,7 +620,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + .ValueContentIds = std::move(ValidAttachments)}, + std::move(CacheValue)); } Request.WriteResponse(HttpResponseCode::Created); @@ -1020,7 +1022,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); + {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}, + std::move(CacheValue)); } return PutResult::Success; } @@ -1370,6 +1373,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ bool Succeeded = false; uint64_t TransferredSize = 0; + IoBuffer Value; if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) { if (Attachment->IsCompressedBinary()) @@ -1384,7 +1388,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Value}); TransferredSize = Chunk.GetCompressedSize(); @@ -1402,6 +1406,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ ZenCacheValue ExistingValue; if (m_CacheStore.Get(Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { + Value = ExistingValue.Value; Succeeded = true; } } @@ -1410,7 +1415,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}, {.Value = Value}); } Results.push_back(Succeeded); ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid"); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index da0743f0a..a0b04fe06 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -1449,17 +1449,19 @@ public: return {}; } - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, ZenCacheValue&& OptionalCacheValue) override { if (m_RunState.IsRunning && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + m_UpstreamQueue.Enqueue( + {std::move(CacheRecord), + m_UpstreamQueue.Size() < m_UpstreamThreads.size() ? std::move(OptionalCacheValue) : ZenCacheValue{}}); } else { - ProcessCacheRecord(std::move(CacheRecord)); + ProcessCacheRecord(std::move(CacheRecord), std::move(OptionalCacheValue)); } } } @@ -1510,14 +1512,14 @@ public: } private: - void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + void ProcessCacheRecord(UpstreamCacheRecord CacheRecord, ZenCacheValue&& OptionalCacheValue) { ZEN_TRACE_CPU("Upstream::ProcessCacheRecord"); - ZenCacheValue CacheValue; + ZenCacheValue CacheValue = std::move(OptionalCacheValue); std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!CacheValue.Value && !m_CacheStore.Get(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1572,16 +1574,19 @@ private: { for (;;) { - UpstreamCacheRecord CacheRecord; - if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) + UpstreamCacheRecordWithData CacheRecordWithData; + if (m_UpstreamQueue.WaitAndDequeue(CacheRecordWithData)) { try { - ProcessCacheRecord(std::move(CacheRecord)); + ProcessCacheRecord(std::move(CacheRecordWithData.CacheRecord), std::move(CacheRecordWithData.CacheValue)); } catch (std::exception& Err) { - ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what()); + ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", + CacheRecordWithData.CacheRecord.Key.Bucket, + CacheRecordWithData.CacheRecord.Key.Hash, + Err.what()); } } @@ -1662,7 +1667,13 @@ private: spdlog::logger& Log() { return m_Log; } - using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; + struct UpstreamCacheRecordWithData + { + UpstreamCacheRecord CacheRecord; + ZenCacheValue CacheValue; + }; + + using UpstreamQueue = BlockingQueue<UpstreamCacheRecordWithData>; struct RunState { diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 6f18b3119..27333a686 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -183,6 +183,8 @@ public: AuthMgr& Mgr); }; +struct ZenCacheValue; + /** * Manages one or more upstream cache endpoints. */ @@ -202,7 +204,7 @@ public: virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) = 0; - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, ZenCacheValue&& OptionalCacheValue) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; |