aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcache.cpp358
1 files changed, 358 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 50ecd41c2..a63a7d719 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -17,6 +17,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
#include <zenutil/cache/cache.h>
+#include <zenutil/zenserverprocess.h>
//#include "cachekey.h"
#include "monitoring/httpstats.h"
@@ -38,6 +39,8 @@
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenutil/cache/cacherequests.h>
+# include <zencore/workthreadpool.h>
#endif
namespace zen {
@@ -2520,6 +2523,361 @@ TEST_CASE("z$service.parse.relative.Uri")
Invalid));
}
+TEST_CASE("z$.async")
+{
+ ScopedTemporaryDirectory TempDir;
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+ ZenCacheStore CacheStore(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
+
+ ZenServerEnvironment TestEnv;
+ std::filesystem::path ProgramBaseDir = GetRunningExecutablePath().parent_path();
+ std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test";
+
+ TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir);
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ const uint16_t RemotePortNumber = 13338;
+
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}", RemotePortNumber);
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ RemoteInstance.SpawnServer(RemotePortNumber);
+ RemoteInstance.WaitUntilReady();
+
+ auto UpstreamCache = UpstreamCache::Create({}, CacheStore, CidStore);
+ std::vector<std::string> UpstreamUrls({RemoteBaseUri});
+ auto ZenUpstreamEndpoint = UpstreamEndpoint::CreateZenEndpoint({.Name = "ZenUpstream",
+ .Urls = UpstreamUrls,
+ .ConnectTimeout = std::chrono::milliseconds(1000),
+ .Timeout = std::chrono::milliseconds(1000)});
+ UpstreamCache->RegisterEndpoint(std::move(ZenUpstreamEndpoint));
+
+ std::atomic_uint32_t WorkCounter;
+ WorkerThreadPool WorkerPool(8);
+ auto AsyncGetLocalRecord = [&WorkerPool, &WorkCounter, &CacheStore](
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const CachePolicy DefaultPolicy,
+ const std::optional<CacheRecordPolicy>& Policy,
+ std::function<void(std::optional<cacherequests::CacheRecord>)>&& OnComplete) {
+ CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetRecordPolicy() : DefaultPolicy;
+ if (EnumHasAllFlags(EffectivePolicy, CachePolicy::QueryLocal))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork([&WorkCounter, &CacheStore, Namespace, Key, DefaultPolicy, Policy, OnComplete]() {
+ ZenCacheValue CacheValue;
+ if (CacheStore.Get(Namespace, Key.Bucket, Key.Hash, CacheValue))
+ {
+ if (CacheValue.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ CbObjectView Reader(CacheValue.Value.GetData());
+ cacherequests::CacheRecord Record;
+ if (Record.Parse(Reader))
+ {
+ OnComplete(Record);
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ }
+ }
+ OnComplete({});
+ WorkCounter.fetch_sub(1);
+ });
+ return;
+ }
+ OnComplete({});
+ };
+
+ auto AsyncGetLocalChunk = [&WorkerPool, &WorkCounter, &CidStore](
+ Oid Id,
+ IoHash RawHash,
+ uint64_t RawSize,
+ const CachePolicy DefaultPolicy,
+ const std::optional<CacheRecordPolicy>& RecordPolicy,
+ std::function<void(std::optional<cacherequests::CacheValueResult>)>&& OnComplete) {
+ CachePolicy EffectivePolicy = RecordPolicy.has_value() ? RecordPolicy->GetValuePolicy(Id) : DefaultPolicy;
+ if (EnumHasAllFlags(EffectivePolicy, CachePolicy::QueryLocal))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork([&WorkCounter, &CidStore, RawHash, RawSize, OnComplete, EffectivePolicy]() {
+ if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::SkipData))
+ {
+ if (CidStore.ContainsChunk(RawHash))
+ {
+ OnComplete(cacherequests::CacheValueResult{.RawSize = RawSize, .RawHash = RawHash});
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ }
+ IoBuffer Chunk = CidStore.FindChunkByCid(RawHash);
+ if (Chunk)
+ {
+ if (CompressedBuffer Body = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))
+ {
+ OnComplete(cacherequests::CacheValueResult{.RawSize = RawSize, .RawHash = RawHash, .Body = Body});
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ }
+ OnComplete({});
+ WorkCounter.fetch_sub(1);
+ return;
+ });
+ }
+ OnComplete({});
+ };
+
+ auto AsyncGetRemoteRecord = [&WorkerPool, &WorkCounter, &UpstreamCache](
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const CachePolicy DefaultPolicy,
+ const std::optional<CacheRecordPolicy>& Policy,
+ std::function<void(std::optional<cacherequests::GetCacheRecordResult>)>&& OnComplete) {
+ CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetRecordPolicy() : DefaultPolicy;
+ if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::QueryRemote))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork([&WorkCounter, &UpstreamCache, Namespace, Key, OnComplete]() {
+ CacheRecordPolicy UpstreamPolicy; // TODO, build proper policy
+ CacheKeyRequest UpstreamRequest = {.Key = {.Bucket = Key.Bucket, .Hash = Key.Hash}, .Policy = UpstreamPolicy};
+ std::vector<CacheKeyRequest*> UpstreamRequestPtrs = {&UpstreamRequest};
+ UpstreamCache->GetCacheRecords(Namespace, UpstreamRequestPtrs, [&](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Package)
+ {
+ OnComplete({});
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ cacherequests::CacheRecord Record;
+ if (!Record.Parse(Params.Record))
+ {
+ OnComplete({});
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ cacherequests::GetCacheRecordResult RecordResult = {.Key = Key};
+ RecordResult.Values.resize(Record.Values.size());
+
+ for (size_t ValueIndex = 0; ValueIndex < Record.Values.size(); ++ValueIndex)
+ {
+ RecordResult.Values[ValueIndex].Id = Record.Values[ValueIndex].Id;
+ RecordResult.Values[ValueIndex].RawHash = Record.Values[ValueIndex].RawHash;
+ RecordResult.Values[ValueIndex].RawSize = Record.Values[ValueIndex].RawSize;
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(RecordResult.Values[ValueIndex].RawHash))
+ {
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ {
+ RecordResult.Values[ValueIndex].Body = Compressed;
+ }
+ }
+ }
+ OnComplete(RecordResult);
+ WorkCounter.fetch_sub(1);
+ });
+ });
+ return;
+ }
+ OnComplete({});
+ };
+
+ auto AsyncGetRemoteChunk = [&WorkerPool, &WorkCounter, &UpstreamCache](
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const Oid& Id,
+ const IoHash& RawHash,
+ const uint64_t RawOffset,
+ const uint64_t RawSize,
+ const CachePolicy DefaultPolicy,
+ const std::optional<CacheRecordPolicy>& RecordPolicy,
+ std::function<void(std::optional<cacherequests::CacheValueResult>)>&& OnComplete) {
+ CachePolicy EffectivePolicy = RecordPolicy.has_value() ? RecordPolicy->GetValuePolicy(Id) : DefaultPolicy;
+ if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::QueryRemote))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork(
+ [&WorkCounter, &UpstreamCache, Namespace, Key, RawHash, RawOffset, RawSize, Id, EffectivePolicy, OnComplete]() {
+ CacheChunkRequest UpstreamChunkRequest = {.Key = Key,
+ .ChunkId = RawHash,
+ .ValueId = Id,
+ .RawOffset = RawOffset,
+ .RawSize = RawSize,
+ .Policy = ConvertToUpstream(EffectivePolicy)};
+ std::vector<CacheChunkRequest*> CacheChunkRequestPtrs{&UpstreamChunkRequest};
+ UpstreamCache->GetCacheChunks(
+ Namespace,
+ CacheChunkRequestPtrs,
+ [EffectivePolicy, &OnComplete](CacheChunkGetCompleteParams&& Params) {
+ if (Params.RawHash == Params.RawHash.Zero)
+ {
+ OnComplete({});
+ return;
+ }
+ if (EnumHasAllFlags(EffectivePolicy, CachePolicy::SkipData) &&
+ !EnumHasAllFlags(EffectivePolicy, CachePolicy::StoreLocal))
+ {
+ OnComplete(cacherequests::CacheValueResult{.RawSize = Params.RawSize, .RawHash = Params.RawHash});
+ return;
+ }
+ CompressedBuffer Body =
+ Params.Value ? CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)) : CompressedBuffer{};
+ OnComplete(cacherequests::CacheValueResult{.RawSize = Params.RawSize, .RawHash = Params.RawHash, .Body = Body});
+ });
+ WorkCounter.fetch_sub(1);
+ });
+ return;
+ }
+ OnComplete({});
+ };
+
+ auto AsyncStoreLocalChunk =
+ [&](Oid ValueId, CachePolicy DefaultPolicy, std::optional<CacheRecordPolicy> Policy, CompressedBuffer Body) {
+ CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetValuePolicy(ValueId) : DefaultPolicy;
+ if (Body && EnumHasAllFlags(EffectivePolicy, CachePolicy::StoreLocal))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork([&WorkCounter, &CidStore, Body]() {
+ CidStore.AddChunk(Body);
+ WorkCounter.fetch_sub(1);
+ });
+ }
+ };
+
+ auto AcceptRemoteChunk = [&](const cacherequests::GetCacheRecordsRequest& Request,
+ cacherequests::GetCacheRecordsResult& Result,
+ size_t Index,
+ Oid ValueId,
+ size_t ValueIndex,
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ if (ValueResult.has_value())
+ {
+ Result.Results[Index]->Values[ValueIndex] = {.Id = ValueId,
+ .RawHash = ValueResult->RawHash,
+ .RawSize = ValueResult->RawSize,
+ .Body = ValueResult->Body};
+ AsyncStoreLocalChunk(ValueId, Request.DefaultPolicy, Request.Requests[Index].Policy, ValueResult->Body);
+ }
+ };
+
+ const cacherequests::GetCacheRecordsRequest Request = {
+ .Namespace = "biff",
+ .Requests = {{.Key = {.Bucket = "bob", .Hash = IoHash::FromHexString("58493681ae8f4c5b38ac58493681ae8f4c5b38ac")}}}};
+ cacherequests::GetCacheRecordsResult Result;
+
+ Result.Results.resize(Request.Requests.size());
+
+ for (size_t Index = 0; Index < Request.Requests.size(); ++Index)
+ {
+ AsyncGetLocalRecord(
+ Request.Namespace,
+ Request.Requests[Index].Key,
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AsyncGetLocalChunk, &AsyncGetRemoteChunk, &AsyncGetRemoteRecord, &AcceptRemoteChunk, &Request, Index, &Result](
+ std::optional<cacherequests::CacheRecord> Record) {
+ if (Record)
+ {
+ Result.Results[Index] = cacherequests::GetCacheRecordResult{.Key = Request.Requests[Index].Key};
+ Result.Results[Index]->Values.resize(Record->Values.size());
+ for (size_t ValueIndex = 0; ValueIndex < Record->Values.size(); ++ValueIndex)
+ {
+ const auto& Value = Record->Values[ValueIndex];
+ AsyncGetLocalChunk(
+ Value.Id,
+ Value.RawHash,
+ Value.RawSize,
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Request, &Result, Index, ValueIndex, &Value](
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ if (ValueResult)
+ {
+ Result.Results[Index]->Values[ValueIndex] = {.Id = Value.Id,
+ .RawHash = ValueResult->RawHash,
+ .RawSize = ValueResult->RawSize,
+ .Body = ValueResult->Body};
+ return;
+ }
+ AsyncGetRemoteChunk(Request.Namespace,
+ Request.Requests[Index].Key,
+ Value.Id,
+ Value.RawHash,
+ 0,
+ ~uint64_t(0),
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AcceptRemoteChunk, &Request, &Value, &Result, Index, ValueIndex](
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ AcceptRemoteChunk(Request, Result, Index, Value.Id, ValueIndex, ValueResult);
+ });
+ });
+ }
+ }
+ else
+ {
+ AsyncGetRemoteRecord(
+ Request.Namespace,
+ Request.Requests[Index].Key,
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AsyncGetLocalChunk, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Result, &Request, Index](
+ std::optional<cacherequests::GetCacheRecordResult>&& RecordResult) {
+ Result.Results[Index] = RecordResult;
+ if (RecordResult)
+ {
+ // TODO: Store remote record locally if StoreLocal is requested
+
+ for (size_t ValueIndex = 0; ValueIndex < RecordResult->Values.size(); ++ValueIndex)
+ {
+ // TODO: Check if we received the chunk as part of the RecordResult
+
+ const auto& Value = RecordResult->Values[ValueIndex];
+ AsyncGetLocalChunk(
+ Value.Id,
+ Value.RawHash,
+ Value.RawSize,
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Request, &Result, Index, ValueIndex, &Value](
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ if (ValueResult)
+ {
+ Result.Results[Index]->Values[ValueIndex] = {.Id = Value.Id,
+ .RawHash = ValueResult->RawHash,
+ .RawSize = ValueResult->RawSize,
+ .Body = ValueResult->Body};
+ return;
+ }
+ AsyncGetRemoteChunk(
+ Request.Namespace,
+ Request.Requests[Index].Key,
+ Value.Id,
+ Value.RawHash,
+ 0,
+ ~uint64_t(0),
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AcceptRemoteChunk, &Request, &Value, &Result, Index, ValueIndex](
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ AcceptRemoteChunk(Request, Result, Index, Value.Id, ValueIndex, ValueResult);
+ });
+ });
+ }
+ }
+ });
+ }
+ });
+ }
+
+ // We should have an more effective wait mechanism than sleep
+ while (WorkCounter.load() > 0)
+ {
+ Sleep(1);
+ }
+}
+
#endif
void