aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--zenserver/cache/structuredcache.cpp358
-rw-r--r--zenutil/cache/cacherequests.cpp76
-rw-r--r--zenutil/include/zenutil/cache/cacherequests.h30
3 files changed, 412 insertions, 52 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 50ecd41c2..a63a7d719 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -17,6 +17,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
#include <zenutil/cache/cache.h>
+#include <zenutil/zenserverprocess.h>
//#include "cachekey.h"
#include "monitoring/httpstats.h"
@@ -38,6 +39,8 @@
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenutil/cache/cacherequests.h>
+# include <zencore/workthreadpool.h>
#endif
namespace zen {
@@ -2520,6 +2523,361 @@ TEST_CASE("z$service.parse.relative.Uri")
Invalid));
}
+TEST_CASE("z$.async")
+{
+ ScopedTemporaryDirectory TempDir;
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+ ZenCacheStore CacheStore(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
+
+ ZenServerEnvironment TestEnv;
+ std::filesystem::path ProgramBaseDir = GetRunningExecutablePath().parent_path();
+ std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test";
+
+ TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir);
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ const uint16_t RemotePortNumber = 13338;
+
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}", RemotePortNumber);
+
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetTestDir(RemoteDataDir);
+ RemoteInstance.SpawnServer(RemotePortNumber);
+ RemoteInstance.WaitUntilReady();
+
+ auto UpstreamCache = UpstreamCache::Create({}, CacheStore, CidStore);
+ std::vector<std::string> UpstreamUrls({RemoteBaseUri});
+ auto ZenUpstreamEndpoint = UpstreamEndpoint::CreateZenEndpoint({.Name = "ZenUpstream",
+ .Urls = UpstreamUrls,
+ .ConnectTimeout = std::chrono::milliseconds(1000),
+ .Timeout = std::chrono::milliseconds(1000)});
+ UpstreamCache->RegisterEndpoint(std::move(ZenUpstreamEndpoint));
+
+ std::atomic_uint32_t WorkCounter;
+ WorkerThreadPool WorkerPool(8);
+ auto AsyncGetLocalRecord = [&WorkerPool, &WorkCounter, &CacheStore](
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const CachePolicy DefaultPolicy,
+ const std::optional<CacheRecordPolicy>& Policy,
+ std::function<void(std::optional<cacherequests::CacheRecord>)>&& OnComplete) {
+ CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetRecordPolicy() : DefaultPolicy;
+ if (EnumHasAllFlags(EffectivePolicy, CachePolicy::QueryLocal))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork([&WorkCounter, &CacheStore, Namespace, Key, DefaultPolicy, Policy, OnComplete]() {
+ ZenCacheValue CacheValue;
+ if (CacheStore.Get(Namespace, Key.Bucket, Key.Hash, CacheValue))
+ {
+ if (CacheValue.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ CbObjectView Reader(CacheValue.Value.GetData());
+ cacherequests::CacheRecord Record;
+ if (Record.Parse(Reader))
+ {
+ OnComplete(Record);
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ }
+ }
+ OnComplete({});
+ WorkCounter.fetch_sub(1);
+ });
+ return;
+ }
+ OnComplete({});
+ };
+
+ auto AsyncGetLocalChunk = [&WorkerPool, &WorkCounter, &CidStore](
+ Oid Id,
+ IoHash RawHash,
+ uint64_t RawSize,
+ const CachePolicy DefaultPolicy,
+ const std::optional<CacheRecordPolicy>& RecordPolicy,
+ std::function<void(std::optional<cacherequests::CacheValueResult>)>&& OnComplete) {
+ CachePolicy EffectivePolicy = RecordPolicy.has_value() ? RecordPolicy->GetValuePolicy(Id) : DefaultPolicy;
+ if (EnumHasAllFlags(EffectivePolicy, CachePolicy::QueryLocal))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork([&WorkCounter, &CidStore, RawHash, RawSize, OnComplete, EffectivePolicy]() {
+ if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::SkipData))
+ {
+ if (CidStore.ContainsChunk(RawHash))
+ {
+ OnComplete(cacherequests::CacheValueResult{.RawSize = RawSize, .RawHash = RawHash});
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ }
+ IoBuffer Chunk = CidStore.FindChunkByCid(RawHash);
+ if (Chunk)
+ {
+ if (CompressedBuffer Body = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))
+ {
+ OnComplete(cacherequests::CacheValueResult{.RawSize = RawSize, .RawHash = RawHash, .Body = Body});
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ }
+ OnComplete({});
+ WorkCounter.fetch_sub(1);
+ return;
+ });
+ }
+ OnComplete({});
+ };
+
+ auto AsyncGetRemoteRecord = [&WorkerPool, &WorkCounter, &UpstreamCache](
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const CachePolicy DefaultPolicy,
+ const std::optional<CacheRecordPolicy>& Policy,
+ std::function<void(std::optional<cacherequests::GetCacheRecordResult>)>&& OnComplete) {
+ CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetRecordPolicy() : DefaultPolicy;
+ if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::QueryRemote))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork([&WorkCounter, &UpstreamCache, Namespace, Key, OnComplete]() {
+ CacheRecordPolicy UpstreamPolicy; // TODO, build proper policy
+ CacheKeyRequest UpstreamRequest = {.Key = {.Bucket = Key.Bucket, .Hash = Key.Hash}, .Policy = UpstreamPolicy};
+ std::vector<CacheKeyRequest*> UpstreamRequestPtrs = {&UpstreamRequest};
+ UpstreamCache->GetCacheRecords(Namespace, UpstreamRequestPtrs, [&](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Package)
+ {
+ OnComplete({});
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ cacherequests::CacheRecord Record;
+ if (!Record.Parse(Params.Record))
+ {
+ OnComplete({});
+ WorkCounter.fetch_sub(1);
+ return;
+ }
+ cacherequests::GetCacheRecordResult RecordResult = {.Key = Key};
+ RecordResult.Values.resize(Record.Values.size());
+
+ for (size_t ValueIndex = 0; ValueIndex < Record.Values.size(); ++ValueIndex)
+ {
+ RecordResult.Values[ValueIndex].Id = Record.Values[ValueIndex].Id;
+ RecordResult.Values[ValueIndex].RawHash = Record.Values[ValueIndex].RawHash;
+ RecordResult.Values[ValueIndex].RawSize = Record.Values[ValueIndex].RawSize;
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(RecordResult.Values[ValueIndex].RawHash))
+ {
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ {
+ RecordResult.Values[ValueIndex].Body = Compressed;
+ }
+ }
+ }
+ OnComplete(RecordResult);
+ WorkCounter.fetch_sub(1);
+ });
+ });
+ return;
+ }
+ OnComplete({});
+ };
+
+ auto AsyncGetRemoteChunk = [&WorkerPool, &WorkCounter, &UpstreamCache](
+ std::string_view Namespace,
+ const CacheKey& Key,
+ const Oid& Id,
+ const IoHash& RawHash,
+ const uint64_t RawOffset,
+ const uint64_t RawSize,
+ const CachePolicy DefaultPolicy,
+ const std::optional<CacheRecordPolicy>& RecordPolicy,
+ std::function<void(std::optional<cacherequests::CacheValueResult>)>&& OnComplete) {
+ CachePolicy EffectivePolicy = RecordPolicy.has_value() ? RecordPolicy->GetValuePolicy(Id) : DefaultPolicy;
+ if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::QueryRemote))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork(
+ [&WorkCounter, &UpstreamCache, Namespace, Key, RawHash, RawOffset, RawSize, Id, EffectivePolicy, OnComplete]() {
+ CacheChunkRequest UpstreamChunkRequest = {.Key = Key,
+ .ChunkId = RawHash,
+ .ValueId = Id,
+ .RawOffset = RawOffset,
+ .RawSize = RawSize,
+ .Policy = ConvertToUpstream(EffectivePolicy)};
+ std::vector<CacheChunkRequest*> CacheChunkRequestPtrs{&UpstreamChunkRequest};
+ UpstreamCache->GetCacheChunks(
+ Namespace,
+ CacheChunkRequestPtrs,
+ [EffectivePolicy, &OnComplete](CacheChunkGetCompleteParams&& Params) {
+ if (Params.RawHash == Params.RawHash.Zero)
+ {
+ OnComplete({});
+ return;
+ }
+ if (EnumHasAllFlags(EffectivePolicy, CachePolicy::SkipData) &&
+ !EnumHasAllFlags(EffectivePolicy, CachePolicy::StoreLocal))
+ {
+ OnComplete(cacherequests::CacheValueResult{.RawSize = Params.RawSize, .RawHash = Params.RawHash});
+ return;
+ }
+ CompressedBuffer Body =
+ Params.Value ? CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)) : CompressedBuffer{};
+ OnComplete(cacherequests::CacheValueResult{.RawSize = Params.RawSize, .RawHash = Params.RawHash, .Body = Body});
+ });
+ WorkCounter.fetch_sub(1);
+ });
+ return;
+ }
+ OnComplete({});
+ };
+
+ auto AsyncStoreLocalChunk =
+ [&](Oid ValueId, CachePolicy DefaultPolicy, std::optional<CacheRecordPolicy> Policy, CompressedBuffer Body) {
+ CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetValuePolicy(ValueId) : DefaultPolicy;
+ if (Body && EnumHasAllFlags(EffectivePolicy, CachePolicy::StoreLocal))
+ {
+ WorkCounter.fetch_add(1);
+ WorkerPool.ScheduleWork([&WorkCounter, &CidStore, Body]() {
+ CidStore.AddChunk(Body);
+ WorkCounter.fetch_sub(1);
+ });
+ }
+ };
+
+ auto AcceptRemoteChunk = [&](const cacherequests::GetCacheRecordsRequest& Request,
+ cacherequests::GetCacheRecordsResult& Result,
+ size_t Index,
+ Oid ValueId,
+ size_t ValueIndex,
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ if (ValueResult.has_value())
+ {
+ Result.Results[Index]->Values[ValueIndex] = {.Id = ValueId,
+ .RawHash = ValueResult->RawHash,
+ .RawSize = ValueResult->RawSize,
+ .Body = ValueResult->Body};
+ AsyncStoreLocalChunk(ValueId, Request.DefaultPolicy, Request.Requests[Index].Policy, ValueResult->Body);
+ }
+ };
+
+ const cacherequests::GetCacheRecordsRequest Request = {
+ .Namespace = "biff",
+ .Requests = {{.Key = {.Bucket = "bob", .Hash = IoHash::FromHexString("58493681ae8f4c5b38ac58493681ae8f4c5b38ac")}}}};
+ cacherequests::GetCacheRecordsResult Result;
+
+ Result.Results.resize(Request.Requests.size());
+
+ for (size_t Index = 0; Index < Request.Requests.size(); ++Index)
+ {
+ AsyncGetLocalRecord(
+ Request.Namespace,
+ Request.Requests[Index].Key,
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AsyncGetLocalChunk, &AsyncGetRemoteChunk, &AsyncGetRemoteRecord, &AcceptRemoteChunk, &Request, Index, &Result](
+ std::optional<cacherequests::CacheRecord> Record) {
+ if (Record)
+ {
+ Result.Results[Index] = cacherequests::GetCacheRecordResult{.Key = Request.Requests[Index].Key};
+ Result.Results[Index]->Values.resize(Record->Values.size());
+ for (size_t ValueIndex = 0; ValueIndex < Record->Values.size(); ++ValueIndex)
+ {
+ const auto& Value = Record->Values[ValueIndex];
+ AsyncGetLocalChunk(
+ Value.Id,
+ Value.RawHash,
+ Value.RawSize,
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Request, &Result, Index, ValueIndex, &Value](
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ if (ValueResult)
+ {
+ Result.Results[Index]->Values[ValueIndex] = {.Id = Value.Id,
+ .RawHash = ValueResult->RawHash,
+ .RawSize = ValueResult->RawSize,
+ .Body = ValueResult->Body};
+ return;
+ }
+ AsyncGetRemoteChunk(Request.Namespace,
+ Request.Requests[Index].Key,
+ Value.Id,
+ Value.RawHash,
+ 0,
+ ~uint64_t(0),
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AcceptRemoteChunk, &Request, &Value, &Result, Index, ValueIndex](
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ AcceptRemoteChunk(Request, Result, Index, Value.Id, ValueIndex, ValueResult);
+ });
+ });
+ }
+ }
+ else
+ {
+ AsyncGetRemoteRecord(
+ Request.Namespace,
+ Request.Requests[Index].Key,
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AsyncGetLocalChunk, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Result, &Request, Index](
+ std::optional<cacherequests::GetCacheRecordResult>&& RecordResult) {
+ Result.Results[Index] = RecordResult;
+ if (RecordResult)
+ {
+ // TODO: Store remote record locally if StoreLocal is requested
+
+ for (size_t ValueIndex = 0; ValueIndex < RecordResult->Values.size(); ++ValueIndex)
+ {
+ // TODO: Check if we received the chunk as part of the RecordResult
+
+ const auto& Value = RecordResult->Values[ValueIndex];
+ AsyncGetLocalChunk(
+ Value.Id,
+ Value.RawHash,
+ Value.RawSize,
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Request, &Result, Index, ValueIndex, &Value](
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ if (ValueResult)
+ {
+ Result.Results[Index]->Values[ValueIndex] = {.Id = Value.Id,
+ .RawHash = ValueResult->RawHash,
+ .RawSize = ValueResult->RawSize,
+ .Body = ValueResult->Body};
+ return;
+ }
+ AsyncGetRemoteChunk(
+ Request.Namespace,
+ Request.Requests[Index].Key,
+ Value.Id,
+ Value.RawHash,
+ 0,
+ ~uint64_t(0),
+ Request.DefaultPolicy,
+ Request.Requests[Index].Policy,
+ [&CidStore, &AcceptRemoteChunk, &Request, &Value, &Result, Index, ValueIndex](
+ std::optional<cacherequests::CacheValueResult> ValueResult) {
+ AcceptRemoteChunk(Request, Result, Index, Value.Id, ValueIndex, ValueResult);
+ });
+ });
+ }
+ }
+ });
+ }
+ });
+ }
+
+ // We should have an more effective wait mechanism than sleep
+ while (WorkCounter.load() > 0)
+ {
+ Sleep(1);
+ }
+}
+
#endif
void
diff --git a/zenutil/cache/cacherequests.cpp b/zenutil/cache/cacherequests.cpp
index 2c14ef35b..e764e80c4 100644
--- a/zenutil/cache/cacherequests.cpp
+++ b/zenutil/cache/cacherequests.cpp
@@ -20,6 +20,8 @@ namespace zen {
namespace cacherequests {
+ using namespace std::literals;
+
namespace {
constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
@@ -970,43 +972,43 @@ namespace cacherequests {
}
}
- // bool CacheRecord::Parse(CbObjectView& Reader)
- // {
- // CbObjectView KeyView = Reader["Key"].AsObjectView();
- //
- // if (!GetRequestCacheKey(KeyView, Key))
- // {
- // return false;
- // }
- // CbArrayView ValuesArray = Reader["Values"].AsArrayView();
- // Values.reserve(ValuesArray.Num());
- // for (CbFieldView Value : ValuesArray)
- // {
- // CbObjectView ObjectView = Value.AsObjectView();
- // Values.push_back({.Id = ObjectView["Id"].AsObjectId(),
- // .RawHash = ObjectView["RawHash"].AsHash(),
- // .RawSize = ObjectView["RawSize"].AsUInt64()});
- // }
- // return true;
- // }
- //
- // bool CacheRecord::Format(CbObjectWriter& Writer) const
- // {
- // WriteCacheRequestKey(Writer, Key);
- // Writer.BeginArray("Values");
- // for (const CacheRecordValue& Value : Values)
- // {
- // Writer.BeginObject();
- // {
- // Writer.AddObjectId("Id", Value.Id);
- // Writer.AddHash("RawHash", Value.RawHash);
- // Writer.AddInteger("RawSize", Value.RawSize);
- // }
- // Writer.EndObject();
- // }
- // Writer.EndArray();
- // return true;
- // }
+ bool CacheRecord::Parse(const CbObjectView& Reader)
+ {
+ CbObjectView KeyView = Reader["Key"sv].AsObjectView();
+
+ if (!GetRequestCacheKey(KeyView, Key))
+ {
+ return false;
+ }
+ CbArrayView ValuesArray = Reader["Values"sv].AsArrayView();
+ Values.reserve(ValuesArray.Num());
+ for (CbFieldView Value : ValuesArray)
+ {
+ CbObjectView ObjectView = Value.AsObjectView();
+ Values.push_back({.Id = ObjectView["Id"sv].AsObjectId(),
+ .RawHash = ObjectView["RawHash"sv].AsBinaryAttachment(),
+ .RawSize = ObjectView["RawSize"sv].AsUInt64()});
+ }
+ return true;
+ }
+
+ bool CacheRecord::Format(CbObjectWriter& Writer) const
+ {
+ WriteCacheRequestKey(Writer, Key);
+ Writer.BeginArray("Values"sv);
+ for (const CacheRecordValue& Value : Values)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddObjectId("Id"sv, Value.Id);
+ Writer.AddBinaryAttachment("RawHash"sv, Value.RawHash);
+ Writer.AddInteger("RawSize"sv, Value.RawSize);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ return true;
+ }
#if ZEN_WITH_TESTS
diff --git a/zenutil/include/zenutil/cache/cacherequests.h b/zenutil/include/zenutil/cache/cacherequests.h
index ffb0f8d5f..4566eba38 100644
--- a/zenutil/include/zenutil/cache/cacherequests.h
+++ b/zenutil/include/zenutil/cache/cacherequests.h
@@ -245,21 +245,21 @@ namespace cacherequests {
//////////////////////////////////////////////////////////////////////////
- // struct CacheRecordValue
- // {
- // Oid Id = Oid::Zero;
- // IoHash RawHash = IoHash::Zero;
- // uint64_t RawSize = 0;
- // };
- //
- // struct CacheRecord
- // {
- // CacheKey Key = CacheKey::Empty;
- // std::vector<CacheRecordValue> Values;
- //
- // bool Parse(CbObjectView& Reader);
- // bool Format(CbObjectWriter& Writer) const;
- // };
+ struct CacheRecordValue
+ {
+ Oid Id = Oid::Zero;
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
+ };
+
+ struct CacheRecord
+ {
+ CacheKey Key = CacheKey::Empty;
+ std::vector<CacheRecordValue> Values;
+
+ bool Parse(const CbObjectView& Reader);
+ bool Format(CbObjectWriter& Writer) const;
+ };
} // namespace cacherequests