aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-27 10:10:44 +0200
committerDan Engelbrecht <[email protected]>2022-04-27 10:10:44 +0200
commitd821c51769cb5f7d433a9cc242d508553d5bd81d (patch)
treea33c00df515631a03a6fab6550780f295ff0009d
parentMerge remote-tracking branch 'origin/main' into de/use-bulk-fetch-from-upstre... (diff)
downloadzen-de/optionally-keep-value-in-memory-when-uploadint-to-remote.tar.xz
zen-de/optionally-keep-value-in-memory-when-uploadint-to-remote.zip
If we don't have much to do in the upstream store queue, pass the data to store in memoryde/optionally-keep-value-in-memory-when-uploadint-to-remote
-rw-r--r--zenserver/cache/structuredcache.cpp17
-rw-r--r--zenserver/upstream/upstreamcache.cpp33
-rw-r--r--zenserver/upstream/upstreamcache.h4
3 files changed, 36 insertions, 18 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 8daf08bff..9123dd780 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -490,7 +490,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}});
+ m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}}, {.Value = Body});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -536,7 +536,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
.Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
+ .ValueContentIds = std::move(ValidAttachments)},
+ {.Value = Body});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -619,7 +620,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
+ .ValueContentIds = std::move(ValidAttachments)},
+ std::move(CacheValue));
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -1020,7 +1022,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)});
+ {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)},
+ std::move(CacheValue));
}
return PutResult::Success;
}
@@ -1370,6 +1373,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
bool Succeeded = false;
uint64_t TransferredSize = 0;
+ IoBuffer Value;
if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
{
if (Attachment->IsCompressedBinary())
@@ -1384,7 +1388,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Value});
TransferredSize = Chunk.GetCompressedSize();
@@ -1402,6 +1406,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
ZenCacheValue ExistingValue;
if (m_CacheStore.Get(Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
+ Value = ExistingValue.Value;
Succeeded = true;
}
}
@@ -1410,7 +1415,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}, {.Value = Value});
}
Results.push_back(Succeeded);
ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid");
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index da0743f0a..a0b04fe06 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -1449,17 +1449,19 @@ public:
return {};
}
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, ZenCacheValue&& OptionalCacheValue) override
{
if (m_RunState.IsRunning && m_Options.WriteUpstream)
{
if (!m_UpstreamThreads.empty())
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ m_UpstreamQueue.Enqueue(
+ {std::move(CacheRecord),
+ m_UpstreamQueue.Size() < m_UpstreamThreads.size() ? std::move(OptionalCacheValue) : ZenCacheValue{}});
}
else
{
- ProcessCacheRecord(std::move(CacheRecord));
+ ProcessCacheRecord(std::move(CacheRecord), std::move(OptionalCacheValue));
}
}
}
@@ -1510,14 +1512,14 @@ public:
}
private:
- void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
+ void ProcessCacheRecord(UpstreamCacheRecord CacheRecord, ZenCacheValue&& OptionalCacheValue)
{
ZEN_TRACE_CPU("Upstream::ProcessCacheRecord");
- ZenCacheValue CacheValue;
+ ZenCacheValue CacheValue = std::move(OptionalCacheValue);
std::vector<IoBuffer> Payloads;
- if (!m_CacheStore.Get(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
+ if (!CacheValue.Value && !m_CacheStore.Get(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
{
ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash);
return;
@@ -1572,16 +1574,19 @@ private:
{
for (;;)
{
- UpstreamCacheRecord CacheRecord;
- if (m_UpstreamQueue.WaitAndDequeue(CacheRecord))
+ UpstreamCacheRecordWithData CacheRecordWithData;
+ if (m_UpstreamQueue.WaitAndDequeue(CacheRecordWithData))
{
try
{
- ProcessCacheRecord(std::move(CacheRecord));
+ ProcessCacheRecord(std::move(CacheRecordWithData.CacheRecord), std::move(CacheRecordWithData.CacheValue));
}
catch (std::exception& Err)
{
- ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what());
+ ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'",
+ CacheRecordWithData.CacheRecord.Key.Bucket,
+ CacheRecordWithData.CacheRecord.Key.Hash,
+ Err.what());
}
}
@@ -1662,7 +1667,13 @@ private:
spdlog::logger& Log() { return m_Log; }
- using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>;
+ struct UpstreamCacheRecordWithData
+ {
+ UpstreamCacheRecord CacheRecord;
+ ZenCacheValue CacheValue;
+ };
+
+ using UpstreamQueue = BlockingQueue<UpstreamCacheRecordWithData>;
struct RunState
{
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 6f18b3119..27333a686 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -183,6 +183,8 @@ public:
AuthMgr& Mgr);
};
+struct ZenCacheValue;
+
/**
* Manages one or more upstream cache endpoints.
*/
@@ -202,7 +204,7 @@ public:
virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0;
virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) = 0;
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, ZenCacheValue&& OptionalCacheValue) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;