diff options
| author | Dan Engelbrecht <[email protected]> | 2022-09-22 00:51:23 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-09-22 10:52:15 +0200 |
| commit | 75d1e777af58922bbba00ebbdc05bdc0fd768b9f (patch) | |
| tree | ecbdc366194193e19caf8387b54dcdeaa8ce0e57 | |
| parent | Add elapsed seconds per individual request from upstream (#167) (diff) | |
| download | zen-de/auto-batch-upstream.tar.xz zen-de/auto-batch-upstream.zip | |
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 358 | ||||
| -rw-r--r-- | zenutil/cache/cacherequests.cpp | 76 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cacherequests.h | 30 |
3 files changed, 412 insertions, 52 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 50ecd41c2..a63a7d719 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -17,6 +17,7 @@ #include <zenhttp/httpserver.h> #include <zenhttp/httpshared.h> #include <zenutil/cache/cache.h> +#include <zenutil/zenserverprocess.h> //#include "cachekey.h" #include "monitoring/httpstats.h" @@ -38,6 +39,8 @@ #if ZEN_WITH_TESTS # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zenutil/cache/cacherequests.h> +# include <zencore/workthreadpool.h> #endif namespace zen { @@ -2520,6 +2523,361 @@ TEST_CASE("z$service.parse.relative.Uri") Invalid)); } +TEST_CASE("z$.async") +{ + ScopedTemporaryDirectory TempDir; + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + ZenCacheStore CacheStore(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); + + ZenServerEnvironment TestEnv; + std::filesystem::path ProgramBaseDir = GetRunningExecutablePath().parent_path(); + std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test"; + + TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t RemotePortNumber = 13338; + + const auto RemoteBaseUri = fmt::format("http://localhost:{}", RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + RemoteInstance.WaitUntilReady(); + + auto UpstreamCache = UpstreamCache::Create({}, CacheStore, CidStore); + std::vector<std::string> UpstreamUrls({RemoteBaseUri}); + auto ZenUpstreamEndpoint = UpstreamEndpoint::CreateZenEndpoint({.Name = "ZenUpstream", + .Urls = UpstreamUrls, + .ConnectTimeout = std::chrono::milliseconds(1000), + .Timeout = std::chrono::milliseconds(1000)}); + UpstreamCache->RegisterEndpoint(std::move(ZenUpstreamEndpoint)); + + std::atomic_uint32_t WorkCounter; + WorkerThreadPool WorkerPool(8); + auto AsyncGetLocalRecord = [&WorkerPool, &WorkCounter, &CacheStore]( + std::string_view Namespace, + const CacheKey& Key, + const CachePolicy DefaultPolicy, + const std::optional<CacheRecordPolicy>& Policy, + std::function<void(std::optional<cacherequests::CacheRecord>)>&& OnComplete) { + CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetRecordPolicy() : DefaultPolicy; + if (EnumHasAllFlags(EffectivePolicy, CachePolicy::QueryLocal)) + { + WorkCounter.fetch_add(1); + WorkerPool.ScheduleWork([&WorkCounter, &CacheStore, Namespace, Key, DefaultPolicy, Policy, OnComplete]() { + ZenCacheValue CacheValue; + if (CacheStore.Get(Namespace, Key.Bucket, Key.Hash, CacheValue)) + { + if (CacheValue.Value.GetContentType() == ZenContentType::kCbObject) + { + CbObjectView Reader(CacheValue.Value.GetData()); + cacherequests::CacheRecord Record; + if (Record.Parse(Reader)) + { + OnComplete(Record); + WorkCounter.fetch_sub(1); + return; + } + } + } + OnComplete({}); + WorkCounter.fetch_sub(1); + }); + return; + } + OnComplete({}); + }; + + auto AsyncGetLocalChunk = [&WorkerPool, &WorkCounter, &CidStore]( + Oid Id, + IoHash RawHash, + uint64_t RawSize, + const CachePolicy DefaultPolicy, + const std::optional<CacheRecordPolicy>& RecordPolicy, + std::function<void(std::optional<cacherequests::CacheValueResult>)>&& OnComplete) { + CachePolicy EffectivePolicy = RecordPolicy.has_value() ? RecordPolicy->GetValuePolicy(Id) : DefaultPolicy; + if (EnumHasAllFlags(EffectivePolicy, CachePolicy::QueryLocal)) + { + WorkCounter.fetch_add(1); + WorkerPool.ScheduleWork([&WorkCounter, &CidStore, RawHash, RawSize, OnComplete, EffectivePolicy]() { + if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::SkipData)) + { + if (CidStore.ContainsChunk(RawHash)) + { + OnComplete(cacherequests::CacheValueResult{.RawSize = RawSize, .RawHash = RawHash}); + WorkCounter.fetch_sub(1); + return; + } + } + IoBuffer Chunk = CidStore.FindChunkByCid(RawHash); + if (Chunk) + { + if (CompressedBuffer Body = CompressedBuffer::FromCompressed(SharedBuffer(Chunk))) + { + OnComplete(cacherequests::CacheValueResult{.RawSize = RawSize, .RawHash = RawHash, .Body = Body}); + WorkCounter.fetch_sub(1); + return; + } + } + OnComplete({}); + WorkCounter.fetch_sub(1); + return; + }); + } + OnComplete({}); + }; + + auto AsyncGetRemoteRecord = [&WorkerPool, &WorkCounter, &UpstreamCache]( + std::string_view Namespace, + const CacheKey& Key, + const CachePolicy DefaultPolicy, + const std::optional<CacheRecordPolicy>& Policy, + std::function<void(std::optional<cacherequests::GetCacheRecordResult>)>&& OnComplete) { + CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetRecordPolicy() : DefaultPolicy; + if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::QueryRemote)) + { + WorkCounter.fetch_add(1); + WorkerPool.ScheduleWork([&WorkCounter, &UpstreamCache, Namespace, Key, OnComplete]() { + CacheRecordPolicy UpstreamPolicy; // TODO, build proper policy + CacheKeyRequest UpstreamRequest = {.Key = {.Bucket = Key.Bucket, .Hash = Key.Hash}, .Policy = UpstreamPolicy}; + std::vector<CacheKeyRequest*> UpstreamRequestPtrs = {&UpstreamRequest}; + UpstreamCache->GetCacheRecords(Namespace, UpstreamRequestPtrs, [&](CacheRecordGetCompleteParams&& Params) { + if (!Params.Package) + { + OnComplete({}); + WorkCounter.fetch_sub(1); + return; + } + cacherequests::CacheRecord Record; + if (!Record.Parse(Params.Record)) + { + OnComplete({}); + WorkCounter.fetch_sub(1); + return; + } + cacherequests::GetCacheRecordResult RecordResult = {.Key = Key}; + RecordResult.Values.resize(Record.Values.size()); + + for (size_t ValueIndex = 0; ValueIndex < Record.Values.size(); ++ValueIndex) + { + RecordResult.Values[ValueIndex].Id = Record.Values[ValueIndex].Id; + RecordResult.Values[ValueIndex].RawHash = Record.Values[ValueIndex].RawHash; + RecordResult.Values[ValueIndex].RawSize = Record.Values[ValueIndex].RawSize; + if (const CbAttachment* Attachment = Params.Package.FindAttachment(RecordResult.Values[ValueIndex].RawHash)) + { + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + { + RecordResult.Values[ValueIndex].Body = Compressed; + } + } + } + OnComplete(RecordResult); + WorkCounter.fetch_sub(1); + }); + }); + return; + } + OnComplete({}); + }; + + auto AsyncGetRemoteChunk = [&WorkerPool, &WorkCounter, &UpstreamCache]( + std::string_view Namespace, + const CacheKey& Key, + const Oid& Id, + const IoHash& RawHash, + const uint64_t RawOffset, + const uint64_t RawSize, + const CachePolicy DefaultPolicy, + const std::optional<CacheRecordPolicy>& RecordPolicy, + std::function<void(std::optional<cacherequests::CacheValueResult>)>&& OnComplete) { + CachePolicy EffectivePolicy = RecordPolicy.has_value() ? RecordPolicy->GetValuePolicy(Id) : DefaultPolicy; + if (EnumHasAnyFlags(EffectivePolicy, CachePolicy::QueryRemote)) + { + WorkCounter.fetch_add(1); + WorkerPool.ScheduleWork( + [&WorkCounter, &UpstreamCache, Namespace, Key, RawHash, RawOffset, RawSize, Id, EffectivePolicy, OnComplete]() { + CacheChunkRequest UpstreamChunkRequest = {.Key = Key, + .ChunkId = RawHash, + .ValueId = Id, + .RawOffset = RawOffset, + .RawSize = RawSize, + .Policy = ConvertToUpstream(EffectivePolicy)}; + std::vector<CacheChunkRequest*> CacheChunkRequestPtrs{&UpstreamChunkRequest}; + UpstreamCache->GetCacheChunks( + Namespace, + CacheChunkRequestPtrs, + [EffectivePolicy, &OnComplete](CacheChunkGetCompleteParams&& Params) { + if (Params.RawHash == Params.RawHash.Zero) + { + OnComplete({}); + return; + } + if (EnumHasAllFlags(EffectivePolicy, CachePolicy::SkipData) && + !EnumHasAllFlags(EffectivePolicy, CachePolicy::StoreLocal)) + { + OnComplete(cacherequests::CacheValueResult{.RawSize = Params.RawSize, .RawHash = Params.RawHash}); + return; + } + CompressedBuffer Body = + Params.Value ? CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)) : CompressedBuffer{}; + OnComplete(cacherequests::CacheValueResult{.RawSize = Params.RawSize, .RawHash = Params.RawHash, .Body = Body}); + }); + WorkCounter.fetch_sub(1); + }); + return; + } + OnComplete({}); + }; + + auto AsyncStoreLocalChunk = + [&](Oid ValueId, CachePolicy DefaultPolicy, std::optional<CacheRecordPolicy> Policy, CompressedBuffer Body) { + CachePolicy EffectivePolicy = Policy.has_value() ? Policy->GetValuePolicy(ValueId) : DefaultPolicy; + if (Body && EnumHasAllFlags(EffectivePolicy, CachePolicy::StoreLocal)) + { + WorkCounter.fetch_add(1); + WorkerPool.ScheduleWork([&WorkCounter, &CidStore, Body]() { + CidStore.AddChunk(Body); + WorkCounter.fetch_sub(1); + }); + } + }; + + auto AcceptRemoteChunk = [&](const cacherequests::GetCacheRecordsRequest& Request, + cacherequests::GetCacheRecordsResult& Result, + size_t Index, + Oid ValueId, + size_t ValueIndex, + std::optional<cacherequests::CacheValueResult> ValueResult) { + if (ValueResult.has_value()) + { + Result.Results[Index]->Values[ValueIndex] = {.Id = ValueId, + .RawHash = ValueResult->RawHash, + .RawSize = ValueResult->RawSize, + .Body = ValueResult->Body}; + AsyncStoreLocalChunk(ValueId, Request.DefaultPolicy, Request.Requests[Index].Policy, ValueResult->Body); + } + }; + + const cacherequests::GetCacheRecordsRequest Request = { + .Namespace = "biff", + .Requests = {{.Key = {.Bucket = "bob", .Hash = IoHash::FromHexString("58493681ae8f4c5b38ac58493681ae8f4c5b38ac")}}}}; + cacherequests::GetCacheRecordsResult Result; + + Result.Results.resize(Request.Requests.size()); + + for (size_t Index = 0; Index < Request.Requests.size(); ++Index) + { + AsyncGetLocalRecord( + Request.Namespace, + Request.Requests[Index].Key, + Request.DefaultPolicy, + Request.Requests[Index].Policy, + [&CidStore, &AsyncGetLocalChunk, &AsyncGetRemoteChunk, &AsyncGetRemoteRecord, &AcceptRemoteChunk, &Request, Index, &Result]( + std::optional<cacherequests::CacheRecord> Record) { + if (Record) + { + Result.Results[Index] = cacherequests::GetCacheRecordResult{.Key = Request.Requests[Index].Key}; + Result.Results[Index]->Values.resize(Record->Values.size()); + for (size_t ValueIndex = 0; ValueIndex < Record->Values.size(); ++ValueIndex) + { + const auto& Value = Record->Values[ValueIndex]; + AsyncGetLocalChunk( + Value.Id, + Value.RawHash, + Value.RawSize, + Request.DefaultPolicy, + Request.Requests[Index].Policy, + [&CidStore, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Request, &Result, Index, ValueIndex, &Value]( + std::optional<cacherequests::CacheValueResult> ValueResult) { + if (ValueResult) + { + Result.Results[Index]->Values[ValueIndex] = {.Id = Value.Id, + .RawHash = ValueResult->RawHash, + .RawSize = ValueResult->RawSize, + .Body = ValueResult->Body}; + return; + } + AsyncGetRemoteChunk(Request.Namespace, + Request.Requests[Index].Key, + Value.Id, + Value.RawHash, + 0, + ~uint64_t(0), + Request.DefaultPolicy, + Request.Requests[Index].Policy, + [&CidStore, &AcceptRemoteChunk, &Request, &Value, &Result, Index, ValueIndex]( + std::optional<cacherequests::CacheValueResult> ValueResult) { + AcceptRemoteChunk(Request, Result, Index, Value.Id, ValueIndex, ValueResult); + }); + }); + } + } + else + { + AsyncGetRemoteRecord( + Request.Namespace, + Request.Requests[Index].Key, + Request.DefaultPolicy, + Request.Requests[Index].Policy, + [&CidStore, &AsyncGetLocalChunk, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Result, &Request, Index]( + std::optional<cacherequests::GetCacheRecordResult>&& RecordResult) { + Result.Results[Index] = RecordResult; + if (RecordResult) + { + // TODO: Store remote record locally if StoreLocal is requested + + for (size_t ValueIndex = 0; ValueIndex < RecordResult->Values.size(); ++ValueIndex) + { + // TODO: Check if we received the chunk as part of the RecordResult + + const auto& Value = RecordResult->Values[ValueIndex]; + AsyncGetLocalChunk( + Value.Id, + Value.RawHash, + Value.RawSize, + Request.DefaultPolicy, + Request.Requests[Index].Policy, + [&CidStore, &AsyncGetRemoteChunk, &AcceptRemoteChunk, &Request, &Result, Index, ValueIndex, &Value]( + std::optional<cacherequests::CacheValueResult> ValueResult) { + if (ValueResult) + { + Result.Results[Index]->Values[ValueIndex] = {.Id = Value.Id, + .RawHash = ValueResult->RawHash, + .RawSize = ValueResult->RawSize, + .Body = ValueResult->Body}; + return; + } + AsyncGetRemoteChunk( + Request.Namespace, + Request.Requests[Index].Key, + Value.Id, + Value.RawHash, + 0, + ~uint64_t(0), + Request.DefaultPolicy, + Request.Requests[Index].Policy, + [&CidStore, &AcceptRemoteChunk, &Request, &Value, &Result, Index, ValueIndex]( + std::optional<cacherequests::CacheValueResult> ValueResult) { + AcceptRemoteChunk(Request, Result, Index, Value.Id, ValueIndex, ValueResult); + }); + }); + } + } + }); + } + }); + } + + // We should have an more effective wait mechanism than sleep + while (WorkCounter.load() > 0) + { + Sleep(1); + } +} + #endif void diff --git a/zenutil/cache/cacherequests.cpp b/zenutil/cache/cacherequests.cpp index 2c14ef35b..e764e80c4 100644 --- a/zenutil/cache/cacherequests.cpp +++ b/zenutil/cache/cacherequests.cpp @@ -20,6 +20,8 @@ namespace zen { namespace cacherequests { + using namespace std::literals; + namespace { constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; @@ -970,43 +972,43 @@ namespace cacherequests { } } - // bool CacheRecord::Parse(CbObjectView& Reader) - // { - // CbObjectView KeyView = Reader["Key"].AsObjectView(); - // - // if (!GetRequestCacheKey(KeyView, Key)) - // { - // return false; - // } - // CbArrayView ValuesArray = Reader["Values"].AsArrayView(); - // Values.reserve(ValuesArray.Num()); - // for (CbFieldView Value : ValuesArray) - // { - // CbObjectView ObjectView = Value.AsObjectView(); - // Values.push_back({.Id = ObjectView["Id"].AsObjectId(), - // .RawHash = ObjectView["RawHash"].AsHash(), - // .RawSize = ObjectView["RawSize"].AsUInt64()}); - // } - // return true; - // } - // - // bool CacheRecord::Format(CbObjectWriter& Writer) const - // { - // WriteCacheRequestKey(Writer, Key); - // Writer.BeginArray("Values"); - // for (const CacheRecordValue& Value : Values) - // { - // Writer.BeginObject(); - // { - // Writer.AddObjectId("Id", Value.Id); - // Writer.AddHash("RawHash", Value.RawHash); - // Writer.AddInteger("RawSize", Value.RawSize); - // } - // Writer.EndObject(); - // } - // Writer.EndArray(); - // return true; - // } + bool CacheRecord::Parse(const CbObjectView& Reader) + { + CbObjectView KeyView = Reader["Key"sv].AsObjectView(); + + if (!GetRequestCacheKey(KeyView, Key)) + { + return false; + } + CbArrayView ValuesArray = Reader["Values"sv].AsArrayView(); + Values.reserve(ValuesArray.Num()); + for (CbFieldView Value : ValuesArray) + { + CbObjectView ObjectView = Value.AsObjectView(); + Values.push_back({.Id = ObjectView["Id"sv].AsObjectId(), + .RawHash = ObjectView["RawHash"sv].AsBinaryAttachment(), + .RawSize = ObjectView["RawSize"sv].AsUInt64()}); + } + return true; + } + + bool CacheRecord::Format(CbObjectWriter& Writer) const + { + WriteCacheRequestKey(Writer, Key); + Writer.BeginArray("Values"sv); + for (const CacheRecordValue& Value : Values) + { + Writer.BeginObject(); + { + Writer.AddObjectId("Id"sv, Value.Id); + Writer.AddBinaryAttachment("RawHash"sv, Value.RawHash); + Writer.AddInteger("RawSize"sv, Value.RawSize); + } + Writer.EndObject(); + } + Writer.EndArray(); + return true; + } #if ZEN_WITH_TESTS diff --git a/zenutil/include/zenutil/cache/cacherequests.h b/zenutil/include/zenutil/cache/cacherequests.h index ffb0f8d5f..4566eba38 100644 --- a/zenutil/include/zenutil/cache/cacherequests.h +++ b/zenutil/include/zenutil/cache/cacherequests.h @@ -245,21 +245,21 @@ namespace cacherequests { ////////////////////////////////////////////////////////////////////////// - // struct CacheRecordValue - // { - // Oid Id = Oid::Zero; - // IoHash RawHash = IoHash::Zero; - // uint64_t RawSize = 0; - // }; - // - // struct CacheRecord - // { - // CacheKey Key = CacheKey::Empty; - // std::vector<CacheRecordValue> Values; - // - // bool Parse(CbObjectView& Reader); - // bool Format(CbObjectWriter& Writer) const; - // }; + struct CacheRecordValue + { + Oid Id = Oid::Zero; + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; + }; + + struct CacheRecord + { + CacheKey Key = CacheKey::Empty; + std::vector<CacheRecordValue> Values; + + bool Parse(const CbObjectView& Reader); + bool Format(CbObjectWriter& Writer) const; + }; } // namespace cacherequests |